فهرست منبع

Merge branch 'release-57' into sync-r57-m-20240617

Thales Macedo Garitezi 1 سال پیش
والد
کامیت
20cffb54d4
30فایلهای تغییر یافته به همراه1034 افزوده شده و 322 حذف شده
  1. 3 0
      .ci/docker-compose-file/kafka/kafka-entrypoint.sh
  2. 2 0
      apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
  3. 46 5
      apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl
  4. 1 1
      apps/emqx_bridge_azure_event_hub/rebar.config
  5. 1 1
      apps/emqx_bridge_confluent/rebar.config
  6. 1 1
      apps/emqx_bridge_kafka/rebar.config
  7. 9 4
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  8. 27 3
      apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl
  9. 2 2
      apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
  10. 69 7
      apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl
  11. 16 4
      apps/emqx_durable_storage/src/emqx_ds_lts.erl
  12. 25 21
      apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl
  13. 67 87
      apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl
  14. 2 2
      apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl
  15. 1 1
      apps/emqx_durable_storage/src/emqx_durable_storage.app.src
  16. 29 13
      apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl
  17. 1 0
      apps/emqx_machine/priv/reboot_lists.eterm
  18. 1 1
      apps/emqx_machine/src/emqx_machine.app.src
  19. 177 7
      apps/emqx_machine/src/user_default.erl
  20. 2 1
      apps/emqx_message_transformation/src/emqx_message_transformation.app.src
  21. 182 109
      apps/emqx_message_transformation/src/emqx_message_transformation.erl
  22. 147 0
      apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl
  23. 122 0
      apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl
  24. 73 50
      apps/emqx_postgresql/src/emqx_postgresql.erl
  25. 1 1
      apps/emqx_utils/src/emqx_variform.erl
  26. 22 0
      changes/ce/feat-13191.en.md
  27. 1 0
      changes/ce/fix-13276.en.md
  28. 0 0
      changes/ee/fix-13277.en.md
  29. 1 1
      mix.exs
  30. 3 0
      rel/i18n/emqx_message_transformation_http_api.hocon

+ 3 - 0
.ci/docker-compose-file/kafka/kafka-entrypoint.sh

@@ -49,6 +49,9 @@ echo "+++++++ Creating Kafka Topics ++++++++"
 # there seem to be a race condition when creating the topics (too early)
 env KAFKA_CREATE_TOPICS="$KAFKA_CREATE_TOPICS_NG" KAFKA_PORT="$PORT1" create-topics.sh
 
+# create a topic with max.message.bytes=100
+/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server "${SERVER}:${PORT1}" --topic max-100-bytes --partitions 1 --replication-factor 1 --config max.message.bytes=100
+
 echo "+++++++ Wait until Kafka ports are down ++++++++"
 
 bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1

+ 2 - 0
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -85,9 +85,11 @@ end_per_testcase(TestCase, Config) when
     Nodes = ?config(nodes, Config),
     emqx_common_test_helpers:call_janitor(60_000),
     ok = emqx_cth_cluster:stop(Nodes),
+    snabbkaffe:stop(),
     ok;
 end_per_testcase(_TestCase, _Config) ->
     emqx_common_test_helpers:call_janitor(60_000),
+    snabbkaffe:stop(),
     ok.
 
 %%------------------------------------------------------------------------------

+ 46 - 5
apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl

@@ -30,15 +30,28 @@
 
 -define(PATH, [authentication]).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 all() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    TCs = AllTCs -- require_seeds_tests(),
     [
-        {group, require_seeds},
-        t_update_with_invalid_config,
-        t_update_with_bad_config_value
+        {group, require_seeds}
+        | TCs
     ].
 
 groups() ->
-    [{require_seeds, [], [t_create, t_authenticate, t_update, t_destroy, t_is_superuser]}].
+    [{require_seeds, [], require_seeds_tests()}].
+
+require_seeds_tests() ->
+    [
+        t_create,
+        t_authenticate,
+        t_authenticate_disabled_prepared_statements,
+        t_update,
+        t_destroy,
+        t_is_superuser
+    ].
 
 init_per_testcase(_, Config) ->
     emqx_authn_test_lib:delete_authenticators(
@@ -47,6 +60,10 @@ init_per_testcase(_, Config) ->
     ),
     Config.
 
+end_per_testcase(_TestCase, _Config) ->
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
 init_per_group(require_seeds, Config) ->
     ok = init_seeds(),
     Config.
@@ -70,7 +87,12 @@ init_per_suite(Config) ->
             ),
             [{apps, Apps} | Config];
         false ->
-            {skip, no_pgsql}
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_postgres);
+                _ ->
+                    {skip, no_postgres}
+            end
     end.
 
 end_per_suite(Config) ->
@@ -174,6 +196,25 @@ test_user_auth(#{
         ?GLOBAL
     ).
 
+t_authenticate_disabled_prepared_statements(Config) ->
+    ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}),
+    {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig),
+    on_exit(fun() ->
+        emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config())
+    end),
+    ok = lists:foreach(
+        fun(Sample0) ->
+            Sample = maps:update_with(
+                config_params,
+                fun(Cfg) -> Cfg#{<<"disable_prepared_statements">> => true} end,
+                Sample0
+            ),
+            ct:pal("test_user_auth sample: ~p", [Sample]),
+            test_user_auth(Sample)
+        end,
+        user_seeds()
+    ).
+
 t_destroy(_Config) ->
     AuthConfig = raw_pgsql_auth_config(),
 

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 9 - 4
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -490,12 +490,17 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
 %% Wolff producer never gives up retrying
 %% so there can only be 'ok' results.
 on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) ->
-    %% the ReplyFn is emqx_resource_buffer_worker:handle_async_reply/2
+    %% the ReplyFn is emqx_rule_runtime:inc_action_metrics/2
     apply(ReplyFn, Args ++ [ok]);
 on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
-    %% wolff should bump the dropped_queue_full counter
-    %% do not apply the callback (which is basically to bump success or fail counter)
-    ok.
+    %% wolff should bump the dropped_queue_full counter in handle_telemetry_event/4
+    %% so there is no need to apply the callback here
+    ok;
+on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) ->
+    %% wolff should bump the message 'dropped' counter with handle_telemetry_event/4.
+    %% however 'dropped' is not mapped to EMQX metrics name
+    %% so we reply error here
+    apply(ReplyFn, Args ++ [{error, message_too_large}]).
 
 %% Note: since wolff client has its own replayq that is not managed by
 %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here.  Otherwise,

+ 27 - 3
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -185,6 +185,10 @@ action_config(ConnectorName, Overrides) ->
     emqx_utils_maps:deep_merge(Cfg1, Overrides).
 
 bridge_v2_config(ConnectorName) ->
+    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    bridge_v2_config(ConnectorName, KafkaTopic).
+
+bridge_v2_config(ConnectorName, KafkaTopic) ->
     #{
         <<"connector">> => ConnectorName,
         <<"enable">> => true,
@@ -209,9 +213,7 @@ bridge_v2_config(ConnectorName) ->
             <<"query_mode">> => <<"sync">>,
             <<"required_acks">> => <<"all_isr">>,
             <<"sync_query_timeout">> => <<"5s">>,
-            <<"topic">> => list_to_binary(
-                emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
-            )
+            <<"topic">> => list_to_binary(KafkaTopic)
         },
         <<"local_topic">> => <<"kafka_t/#">>,
         <<"resource_opts">> => #{
@@ -378,6 +380,28 @@ t_local_topic(_) ->
     ok = emqx_connector:remove(?TYPE, test_connector),
     ok.
 
+t_message_too_large(_) ->
+    BridgeV2Config = bridge_v2_config(<<"test_connector4">>, "max-100-bytes"),
+    ConnectorConfig = connector_config(),
+    {ok, _} = emqx_connector:create(?TYPE, test_connector4, ConnectorConfig),
+    BridgeName = test_bridge4,
+    {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config),
+    BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName),
+    TooLargePayload = iolist_to_binary(lists:duplicate(100, 100)),
+    ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)),
+    emqx:publish(emqx_message:make(<<"kafka_t/hej">>, TooLargePayload)),
+    ?retry(
+        _Sleep0 = 50,
+        _Attempts0 = 100,
+        begin
+            ?assertEqual(1, emqx_resource_metrics:failed_get(BridgeV2Id)),
+            ok
+        end
+    ),
+    ok = emqx_bridge_v2:remove(?TYPE, BridgeName),
+    ok = emqx_connector:remove(?TYPE, test_connector4),
+    ok.
+
 t_unknown_topic(_Config) ->
     ConnectorName = <<"test_connector">>,
     BridgeName = <<"test_bridge">>,

+ 2 - 2
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -601,7 +601,7 @@ t_simple_sql_query(Config) ->
         {ok, _},
         create_bridge(Config)
     ),
-    Request = {sql, <<"SELECT count(1) AS T">>},
+    Request = {query, <<"SELECT count(1) AS T">>},
     Result =
         case QueryMode of
             sync ->
@@ -651,7 +651,7 @@ t_bad_sql_parameter(Config) ->
         {ok, _},
         create_bridge(Config)
     ),
-    Request = {sql, <<"">>, [bad_parameter]},
+    Request = {query, <<"">>, [bad_parameter]},
     Result =
         case QueryMode of
             sync ->

+ 69 - 7
apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl

@@ -102,6 +102,18 @@ init_per_group(Group, Config) when
         {connector_type, group_to_type(Group)}
         | Config
     ];
+init_per_group(batch_enabled, Config) ->
+    [
+        {batch_size, 10},
+        {batch_time, <<"10ms">>}
+        | Config
+    ];
+init_per_group(batch_disabled, Config) ->
+    [
+        {batch_size, 1},
+        {batch_time, <<"0ms">>}
+        | Config
+    ];
 init_per_group(_Group, Config) ->
     Config.
 
@@ -262,16 +274,66 @@ t_start_action_or_source_with_disabled_connector(Config) ->
     ok.
 
 t_disable_prepared_statements(matrix) ->
-    [[postgres], [timescale], [matrix]];
+    [
+        [postgres, batch_disabled],
+        [postgres, batch_enabled],
+        [timescale, batch_disabled],
+        [timescale, batch_enabled],
+        [matrix, batch_disabled],
+        [matrix, batch_enabled]
+    ];
 t_disable_prepared_statements(Config0) ->
+    BatchSize = ?config(batch_size, Config0),
+    BatchTime = ?config(batch_time, Config0),
     ConnectorConfig0 = ?config(connector_config, Config0),
     ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}),
-    Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
-    ok = emqx_bridge_v2_testlib:t_sync_query(
-        Config,
-        fun make_message/0,
-        fun(Res) -> ?assertMatch({ok, _}, Res) end,
-        postgres_bridge_connector_on_query_return
+    BridgeConfig0 = ?config(bridge_config, Config0),
+    BridgeConfig = emqx_utils_maps:deep_merge(
+        BridgeConfig0,
+        #{
+            <<"resource_opts">> => #{
+                <<"batch_size">> => BatchSize,
+                <<"batch_time">> => BatchTime,
+                <<"query_mode">> => <<"async">>
+            }
+        }
+    ),
+    Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}),
+    Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}),
+    ?check_trace(
+        #{timetrap => 5_000},
+        begin
+            ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)),
+            RuleTopic = <<"t/postgres">>,
+            Type = ?config(bridge_type, Config),
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config),
+            ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            ),
+            {ok, C} = emqtt:start_link(),
+            {ok, _} = emqtt:connect(C),
+            lists:foreach(
+                fun(N) ->
+                    emqtt:publish(C, RuleTopic, integer_to_binary(N))
+                end,
+                lists:seq(1, BatchSize)
+            ),
+            case BatchSize > 1 of
+                true ->
+                    ?block_until(#{
+                        ?snk_kind := "postgres_success_batch_result",
+                        row_count := BatchSize
+                    }),
+                    ok;
+                false ->
+                    ok
+            end,
+            ok
+        end,
+        []
     ),
     emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),

+ 16 - 4
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -20,6 +20,7 @@
 -export([
     trie_create/1, trie_create/0,
     destroy/1,
+    trie_dump/2,
     trie_restore/2,
     trie_update/2,
     trie_copy_learned_paths/2,
@@ -76,6 +77,8 @@
         static_key_size => pos_integer()
     }.
 
+-type dump() :: [{_Key, _Val}].
+
 -record(trie, {
     persist :: persist_callback(),
     static_key_size :: pos_integer(),
@@ -125,12 +128,12 @@ destroy(#trie{trie = Trie, stats = Stats}) ->
     ok.
 
 %% @doc Restore trie from a dump
--spec trie_restore(options(), [{_Key, _Val}]) -> trie().
+-spec trie_restore(options(), dump()) -> trie().
 trie_restore(Options, Dump) ->
     trie_update(trie_create(Options), Dump).
 
 %% @doc Update a trie with a dump of operations (used for replication)
--spec trie_update(trie(), [{_Key, _Val}]) -> trie().
+-spec trie_update(trie(), dump()) -> trie().
 trie_update(Trie, Dump) ->
     lists:foreach(
         fun({{StateFrom, Token}, StateTo}) ->
@@ -140,14 +143,23 @@ trie_update(Trie, Dump) ->
     ),
     Trie.
 
+-spec trie_dump(trie(), _Filter :: all | wildcard) -> dump().
+trie_dump(Trie, Filter) ->
+    case Filter of
+        all ->
+            Fun = fun(_) -> true end;
+        wildcard ->
+            Fun = fun contains_wildcard/1
+    end,
+    lists:append([P || P <- paths(Trie), Fun(P)]).
+
 -spec trie_copy_learned_paths(trie(), trie()) -> trie().
 trie_copy_learned_paths(OldTrie, NewTrie) ->
-    WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)],
     lists:foreach(
         fun({{StateFrom, Token}, StateTo}) ->
             trie_insert(NewTrie, StateFrom, Token, StateTo)
         end,
-        lists:flatten(WildcardPaths)
+        trie_dump(OldTrie, wildcard)
     ),
     NewTrie.
 

+ 25 - 21
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -25,7 +25,7 @@
 
 %% behavior callbacks:
 -export([
-    create/4,
+    create/5,
     open/5,
     drop/5,
     prepare_batch/4,
@@ -37,7 +37,6 @@
     update_iterator/4,
     next/6,
     delete_next/6,
-    post_creation_actions/1,
 
     handle_event/4
 ]).
@@ -179,10 +178,11 @@
     emqx_ds_storage_layer:shard_id(),
     rocksdb:db_handle(),
     emqx_ds_storage_layer:gen_id(),
-    options()
+    options(),
+    _PrevGeneration :: s() | undefined
 ) ->
     {schema(), emqx_ds_storage_layer:cf_refs()}.
-create(_ShardId, DBHandle, GenId, Options) ->
+create(_ShardId, DBHandle, GenId, Options, SPrev) ->
     %% Get options:
     BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64),
     TopicIndexBytes = maps:get(topic_index_bytes, Options, 4),
@@ -193,6 +193,14 @@ create(_ShardId, DBHandle, GenId, Options) ->
     TrieCFName = trie_cf(GenId),
     {ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []),
     {ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []),
+    case SPrev of
+        #s{trie = TriePrev} ->
+            ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev),
+            ?tp(bitfield_lts_inherited_trie, #{}),
+            ok;
+        undefined ->
+            ok
+    end,
     %% Create schema:
     Schema = #{
         bits_per_wildcard_level => BitsPerTopicLevel,
@@ -241,20 +249,6 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
         gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}])
     }.
 
--spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
-    s().
-post_creation_actions(
-    #{
-        new_gen_runtime_data := NewGenData,
-        old_gen_runtime_data := OldGenData
-    }
-) ->
-    #s{trie = OldTrie} = OldGenData,
-    #s{trie = NewTrie0} = NewGenData,
-    NewTrie = copy_previous_trie(OldTrie, NewTrie0),
-    ?tp(bitfield_lts_inherited_trie, #{}),
-    NewGenData#s{trie = NewTrie}.
-
 -spec drop(
     emqx_ds_storage_layer:shard_id(),
     rocksdb:db_handle(),
@@ -905,9 +899,19 @@ restore_trie(TopicIndexBytes, DB, CF) ->
         rocksdb:iterator_close(IT)
     end.
 
--spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie().
-copy_previous_trie(OldTrie, NewTrie) ->
-    emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie).
+-spec copy_previous_trie(rocksdb:db_handle(), rocksdb:cf_handle(), emqx_ds_lts:trie()) ->
+    ok.
+copy_previous_trie(DB, TrieCF, TriePrev) ->
+    {ok, Batch} = rocksdb:batch(),
+    lists:foreach(
+        fun({Key, Val}) ->
+            ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val))
+        end,
+        emqx_ds_lts:trie_dump(TriePrev, wildcard)
+    ),
+    Result = rocksdb:write_batch(DB, Batch, []),
+    rocksdb:release_batch(Batch),
+    Result.
 
 read_persisted_trie(IT, {ok, KeyB, ValB}) ->
     [

+ 67 - 87
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -69,7 +69,6 @@
     shard_id/0,
     options/0,
     prototype/0,
-    post_creation_context/0,
     cooked_batch/0
 ]).
 
@@ -169,11 +168,14 @@
     until := emqx_ds:time() | undefined
 }.
 
+%% Module-specific runtime data, as instantiated by `Mod:open/5` callback function.
+-type generation_data() :: term().
+
 %% Schema for a generation. Persistent term.
 -type generation_schema() :: generation(term()).
 
 %% Runtime view of generation:
--type generation() :: generation(term()).
+-type generation() :: generation(generation_data()).
 
 %%%% Shard:
 
@@ -194,38 +196,32 @@
 
 -type options() :: map().
 
--type post_creation_context() ::
-    #{
-        shard_id := emqx_ds_storage_layer:shard_id(),
-        db := rocksdb:db_handle(),
-        new_gen_id := emqx_ds_storage_layer:gen_id(),
-        old_gen_id := emqx_ds_storage_layer:gen_id(),
-        new_cf_refs := cf_refs(),
-        old_cf_refs := cf_refs(),
-        new_gen_runtime_data := _NewData,
-        old_gen_runtime_data := _OldData
-    }.
-
 %%================================================================================
 %% Generation callbacks
 %%================================================================================
 
 %% Create the new schema given generation id and the options.
 %% Create rocksdb column families.
--callback create(shard_id(), rocksdb:db_handle(), gen_id(), Options :: map()) ->
+-callback create(
+    shard_id(),
+    rocksdb:db_handle(),
+    gen_id(),
+    Options :: map(),
+    generation_data() | undefined
+) ->
     {_Schema, cf_refs()}.
 
 %% Open the existing schema
 -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
-    _Data.
+    generation_data().
 
 %% Delete the schema and data
--callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) ->
+-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), generation_data()) ->
     ok | {error, _Reason}.
 
 -callback prepare_batch(
     shard_id(),
-    _Data,
+    generation_data(),
     [{emqx_ds:time(), emqx_types:message()}, ...],
     emqx_ds:message_store_opts()
 ) ->
@@ -233,34 +229,44 @@
 
 -callback commit_batch(
     shard_id(),
-    _Data,
+    generation_data(),
     _CookedBatch
 ) -> ok | emqx_ds:error(_).
 
--callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
+-callback get_streams(
+    shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time()
+) ->
     [_Stream].
 
--callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) ->
+-callback make_iterator(
+    shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()
+) ->
     emqx_ds:make_iterator_result(_Iterator).
 
 -callback make_delete_iterator(
-    shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time()
+    shard_id(), generation_data(), _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time()
 ) ->
     emqx_ds:make_delete_iterator_result(_Iterator).
 
--callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) ->
+-callback next(
+    shard_id(), generation_data(), Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()
+) ->
     {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
 
 -callback delete_next(
-    shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time()
+    shard_id(),
+    generation_data(),
+    DeleteIterator,
+    emqx_ds:delete_selector(),
+    pos_integer(),
+    emqx_ds:time()
 ) ->
     {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}.
 
--callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent].
-
--callback post_creation_actions(post_creation_context()) -> _Data.
+-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) ->
+    [CustomEvent].
 
--optional_callbacks([post_creation_actions/1, handle_event/4]).
+-optional_callbacks([handle_event/4]).
 
 %%================================================================================
 %% API for the replication layer
@@ -686,42 +692,14 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) ->
     server_state() | {error, overlaps_existing_generations}.
 handle_add_generation(S0, Since) ->
     #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0,
-
-    #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0,
-    OldKey = ?GEN_KEY(OldGenId),
-    #{OldKey := OldGenSchema} = Schema0,
-    #{cf_refs := OldCFRefs} = OldGenSchema,
-    #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0,
-
     Schema1 = update_last_until(Schema0, Since),
     Shard1 = update_last_until(Shard0, Since),
-
     case Schema1 of
         _Updated = #{} ->
-            {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since),
+            {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Shard0, Since),
             CFRefs = NewCFRefs ++ CFRefs0,
             Key = ?GEN_KEY(GenId),
-            Generation0 =
-                #{data := NewGenData0} =
-                open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
-            %% When the new generation's module is the same as the last one, we might want to
-            %% perform actions like inheriting some of the previous (meta)data.
-            NewGenData =
-                run_post_creation_actions(
-                    #{
-                        shard_id => ShardId,
-                        db => DB,
-                        new_gen_id => GenId,
-                        old_gen_id => OldGenId,
-                        new_cf_refs => NewCFRefs,
-                        old_cf_refs => OldCFRefs,
-                        new_gen_runtime_data => NewGenData0,
-                        old_gen_runtime_data => OldGenData,
-                        new_module => CurrentMod,
-                        old_module => OldMod
-                    }
-                ),
-            Generation = Generation0#{data := NewGenData},
+            Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)),
             Shard = Shard1#{current_generation := GenId, Key => Generation},
             S0#s{
                 cf_refs = CFRefs,
@@ -834,9 +812,28 @@ create_new_shard_schema(ShardId, DB, CFRefs, Prototype) ->
 -spec new_generation(shard_id(), rocksdb:db_handle(), shard_schema(), emqx_ds:time()) ->
     {gen_id(), shard_schema(), cf_refs()}.
 new_generation(ShardId, DB, Schema0, Since) ->
+    new_generation(ShardId, DB, Schema0, undefined, Since).
+
+-spec new_generation(
+    shard_id(),
+    rocksdb:db_handle(),
+    shard_schema(),
+    shard() | undefined,
+    emqx_ds:time()
+) ->
+    {gen_id(), shard_schema(), cf_refs()}.
+new_generation(ShardId, DB, Schema0, Shard0, Since) ->
     #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0,
+    case Shard0 of
+        #{?GEN_KEY(PrevGenId) := #{module := Mod} = PrevGen} ->
+            %% When the new generation's module is the same as the last one, we might want
+            %% to perform actions like inheriting some of the previous (meta)data.
+            PrevRuntimeData = maps:get(data, PrevGen);
+        _ ->
+            PrevRuntimeData = undefined
+    end,
     GenId = next_generation_id(PrevGenId),
-    {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf),
+    {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf, PrevRuntimeData),
     GenSchema = #{
         module => Mod,
         data => GenData,
@@ -918,23 +915,6 @@ update_last_until(Schema = #{current_generation := GenId}, Until) ->
             {error, overlaps_existing_generations}
     end.
 
-run_post_creation_actions(
-    #{
-        new_module := Mod,
-        old_module := Mod,
-        new_gen_runtime_data := NewGenData
-    } = Context
-) ->
-    case erlang:function_exported(Mod, post_creation_actions, 1) of
-        true ->
-            Mod:post_creation_actions(Context);
-        false ->
-            NewGenData
-    end;
-run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
-    %% Different implementation modules
-    NewGenData.
-
 handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
     Name = integer_to_list(erlang:system_time(millisecond)),
     Dir = checkpoint_dir(ShardId, Name),
@@ -1007,17 +987,17 @@ generation_get(Shard, GenId) ->
 
 -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
 generations_since(Shard, Since) ->
-    Schema = get_schema_runtime(Shard),
-    maps:fold(
-        fun
-            (?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since ->
-                [GenId | Acc];
-            (_K, _V, Acc) ->
-                Acc
-        end,
-        [],
-        Schema
-    ).
+    Schema = #{current_generation := Current} = get_schema_runtime(Shard),
+    list_generations_since(Schema, Current, Since).
+
+list_generations_since(Schema, GenId, Since) ->
+    case Schema of
+        #{?GEN_KEY(GenId) := #{until := Until}} when Until > Since ->
+            [GenId | list_generations_since(Schema, GenId - 1, Since)];
+        #{} ->
+            %% No more live generations.
+            []
+    end.
 
 format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) ->
     #{

+ 2 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl

@@ -28,7 +28,7 @@
 
 %% behavior callbacks:
 -export([
-    create/4,
+    create/5,
     open/5,
     drop/5,
     prepare_batch/4,
@@ -88,7 +88,7 @@
 %% behavior callbacks
 %%================================================================================
 
-create(_ShardId, DBHandle, GenId, _Options) ->
+create(_ShardId, DBHandle, GenId, _Options, _SPrev) ->
     CFName = data_cf(GenId),
     {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, []),
     Schema = #schema{},

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,7 +2,7 @@
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]},

+ 29 - 13
apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl

@@ -177,20 +177,33 @@ t_new_generation_inherit_trie(_Config) ->
     ?check_trace(
         begin
             %% Create a bunch of topics to be learned in the first generation
-            Timestamps = lists:seq(1, 10_000, 100),
-            Batch = [
-                begin
-                    Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]),
-                    {TS, make_message(TS, Topic, integer_to_binary(TS))}
-                end
+            TS1 = 500,
+            Batch1 = [
+                {TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
              || I <- lists:seq(1, 200),
-                TS <- Timestamps,
                 Suffix <- [<<"foo">>, <<"bar">>]
             ],
-            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []),
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
             %% Now we create a new generation with the same LTS module.  It should inherit the
             %% learned trie.
-            ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000),
+            ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000),
+            %% Restart the shard, to verify that LTS is persisted.
+            ok = application:stop(emqx_durable_storage),
+            ok = application:start(emqx_durable_storage),
+            ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG),
+            %% Store a batch of messages with the same set of topics.
+            TS2 = 1_500,
+            Batch2 = [
+                {TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))}
+             || I <- lists:seq(1, 200),
+                Suffix <- [<<"foo">>, <<"bar">>]
+            ],
+            ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
+            %% We should get only two streams for wildcard query, for "foo" and for "bar".
+            ?assertMatch(
+                [_Foo, _Bar],
+                emqx_ds_storage_layer:get_streams(?SHARD, [<<"wildcard">>, '#'], 1_000)
+            ),
             ok
         end,
         fun(Trace) ->
@@ -211,10 +224,7 @@ t_replay(_Config) ->
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []),
     %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
     Batch2 = [
-        begin
-            Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]),
-            {TS, make_message(TS, Topic, integer_to_binary(TS))}
-        end
+        {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))}
      || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
     ],
     ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []),
@@ -475,6 +485,9 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) ->
         payload = Payload
     }.
 
+make_topic(Tokens = [_ | _]) ->
+    emqx_topic:join([bin(T) || T <- Tokens]).
+
 payloads(Messages) ->
     lists:map(
         fun(#message{payload = P}) ->
@@ -488,6 +501,9 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) ->
 parse_topic(Topic) ->
     emqx_topic:words(iolist_to_binary(Topic)).
 
+bin(X) ->
+    emqx_utils_conv:bin(X).
+
 %% CT callbacks
 
 all() -> emqx_common_test_helpers:all(?MODULE).

+ 1 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -117,6 +117,7 @@
             emqx_bridge_oracle,
             emqx_bridge_rabbitmq,
             emqx_bridge_azure_event_hub,
+            emqx_s3,
             emqx_bridge_s3,
             emqx_bridge_azure_blob_storage,
             emqx_schema_registry,

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -6,7 +6,7 @@
     {vsn, "0.3.1"},
     {modules, []},
     {registered, []},
-    {applications, [kernel, stdlib, emqx_ctl]},
+    {applications, [kernel, stdlib, emqx_ctl, redbug]},
     {mod, {emqx_machine_app, []}},
     {env, []},
     {licenses, ["Apache-2.0"]},

+ 177 - 7
apps/emqx_machine/src/user_default.erl

@@ -19,21 +19,191 @@
 %% Import all the record definitions from the header file into the erlang shell.
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx_conf/include/emqx_conf.hrl").
 -include_lib("emqx_dashboard/include/emqx_dashboard.hrl").
 %% INCLUDE END
 
+-define(TIME, 3 * 60).
+-define(MESSAGE, 512).
+-define(GREEN, <<"\e[32;1m">>).
+-define(RED, <<"\e[31m">>).
+-define(RESET, <<"\e[0m">>).
+
 %% API
 -export([lock/0, unlock/0]).
--export([t/1, t2/1, t/2, t2/2, t/3, t2/3]).
+-export([trace/0, t/0, t/1, t/2, t_msg/0, t_msg/1, t_stop/0]).
+
+-dialyzer({nowarn_function, start_trace/3}).
+-dialyzer({no_return, [t/0, t/1, t/2]}).
 
 lock() -> emqx_restricted_shell:lock().
 unlock() -> emqx_restricted_shell:unlock().
 
-t(M) -> recon_trace:calls({M, '_', return_trace}, 300).
-t2(M) -> recon_trace:calls({M, '_', return_trace}, 300, [{args, arity}]).
-t(M, F) -> recon_trace:calls({M, F, return_trace}, 300).
-t2(M, F) -> recon_trace:calls({M, F, return_trace}, 300, [{args, arity}]).
-t(M, F, A) -> recon_trace:calls({M, F, A}, 300).
-t2(M, F, A) -> recon_trace:calls({M, F, A}, 300, [{args, arity}]).
+trace() ->
+    ?ULOG("Trace Usage:~n", []),
+    ?ULOG("  --------------------------------------------------~n", []),
+    ?ULOG("  t(Mod, Func) -> trace a specify function.~n", []),
+    ?ULOG("  t(RTPs) -> trace in Redbug Trace Patterns.~n", []),
+    ?ULOG("       eg1: t(\"emqx_hooks:run\").~n", []),
+    ?ULOG("       eg2: t(\"emqx_hooks:run/2\").~n", []),
+    ?ULOG("       eg3: t(\"emqx_hooks:run/2 -> return\").~n", []),
+    ?ULOG(
+        "       eg4: t(\"emqx_hooks:run('message.dropped',[_, #{node := N}, _])"
+        "when N =:= 'emqx@127.0.0.1' -> stack,return\"~n",
+        []
+    ),
+    ?ULOG("  t() ->   when you forget the RTPs.~n", []),
+    ?ULOG("  --------------------------------------------------~n", []),
+    ?ULOG("  t_msg(PidorRegName) -> trace a pid/registed name's messages.~n", []),
+    ?ULOG("  t_msg([Pid,RegName]) -> trace a list pids's messages.~n", []),
+    ?ULOG("  t_msg() ->  when you forget the pids.~n", []),
+    ?ULOG("  --------------------------------------------------~n", []),
+    ?ULOG("  t_stop() -> stop running trace.~n", []),
+    ?ULOG("  --------------------------------------------------~n", []),
+    ok.
+
+t_stop() ->
+    ensure_redbug_stop().
+
+t() ->
+    {M, F} = get_rtp_fun(),
+    t(M, F).
+
+t(M) ->
+    t(M, "").
+
+t(M, F) ->
+    ensure_redbug_stop(),
+    RTP = format_rtp(emqx_utils_conv:str(M), emqx_utils_conv:str(F)),
+    Pids = get_procs(erlang:system_info(process_count)),
+    Options = [{time, ?TIME * 1000}, {msgs, ?MESSAGE}, debug, {procs, Pids}],
+    start_trace(RTP, Options, Pids).
+
+t_msg() ->
+    ?ULOG("Tracing on specific pids's send/receive message: ~n", []),
+    Pids = get_pids(),
+    t_msg(Pids).
+
+t_msg([]) ->
+    exit("procs can't be empty");
+t_msg(Pids) when is_list(Pids) ->
+    ensure_redbug_stop(),
+    Options = [{time, ?TIME * 1000}, {msgs, ?MESSAGE}, {procs, Pids}],
+    start_trace(['send', 'receive'], Options, Pids);
+t_msg(Pid) ->
+    t_msg([Pid]).
+
+start_trace(RTP, Options, Pids) ->
+    info("~nredbug:start(~0p, ~0p)", [RTP, Options]),
+    case redbug:start(RTP, Options) of
+        {argument_error, no_matching_functions} ->
+            warning("~p no matching function", [RTP]);
+        {argument_error, no_matching_processes} ->
+            case Pids of
+                [Pid] -> warning("~p is dead", [Pid]);
+                _ -> warning("~p are dead", [Pids])
+            end;
+        {argument_error, Reason} ->
+            warning("argument_error:~p~n", [Reason]);
+        normal ->
+            warning("bad RTPs: ~p", [RTP]);
+        {_Name, ProcessCount, 0} ->
+            info(
+                "Tracing (~w) processes matching ~p within ~w seconds",
+                [ProcessCount, RTP, ?TIME]
+            );
+        {_Name, ProcessCount, FunCount} ->
+            info(
+                "Tracing (~w) processes matching ~ts within ~w seconds and ~w function",
+                [ProcessCount, RTP, ?TIME, FunCount]
+            )
+    end.
+
+get_rtp_fun() ->
+    RTP0 = io:get_line("Module:Function | Module | RTPs:\n"),
+    RTP1 = string:trim(RTP0, both, " \n"),
+    case string:split(RTP1, ":") of
+        [M] -> {M, get_function()};
+        [M, ""] -> {M, get_function()};
+        [M, F] -> {M, F}
+    end.
+
+get_function() ->
+    ?ULOG("Function(func|func/3|func('_', atom, X) when is_integer(X)) :~n", []),
+    F0 = io:get_line(""),
+    string:trim(F0, both, " \n").
+
+format_rtp("", _) ->
+    exit("Module can't be empty");
+format_rtp(M, "") ->
+    add_return(M);
+format_rtp(M, F) ->
+    M ++ ":" ++ add_return(F).
+
+add_return(M) ->
+    case string:find(M, "->") of
+        nomatch -> M ++ "-> return";
+        _ -> M
+    end.
+
+get_procs(ProcCount) when ProcCount > 2500 ->
+    warning("Tracing include all(~w) processes can be very risky", [ProcCount]),
+    get_pids();
+get_procs(_ProcCount) ->
+    all.
+
+get_pids() ->
+    Str = io:get_line("<0.1.0>|<0.1.0>,<0.2.0>|all|new|running|RegName:"),
+    try
+        lists:map(fun parse_pid/1, string:tokens(Str, ", \n"))
+    catch
+        throw:{not_registered, Name} ->
+            warning("~ts not registered~n", [Name]),
+            get_pids();
+        throw:new ->
+            new;
+        throw:running ->
+            running;
+        throw:quit ->
+            throw(quit);
+        throw:all ->
+            all;
+        _:_ ->
+            warning("Invalid pid: ~ts~n:", [Str]),
+            get_pids()
+    end.
+
+parse_pid("<0." ++ _ = L) ->
+    list_to_pid(L);
+parse_pid("all") ->
+    throw(all);
+parse_pid("new") ->
+    throw(new);
+parse_pid("running") ->
+    throw(running);
+parse_pid("q") ->
+    throw(quit);
+parse_pid(NameStr) ->
+    case emqx_utils:safe_to_existing_atom(NameStr, utf8) of
+        {ok, Name} ->
+            case whereis(Name) of
+                undefined -> throw({not_registered, NameStr});
+                Pid -> Pid
+            end;
+        {error, _} ->
+            throw({not_registered, NameStr})
+    end.
+
+warning(Fmt, Args) -> ?ELOG("~s" ++ Fmt ++ ".~s~n", [?RED] ++ Args ++ [?RESET]).
+info(Fmt, Args) -> ?ELOG("~s" ++ Fmt ++ ".~s~n", [?GREEN] ++ Args ++ [?RESET]).
+
+ensure_redbug_stop() ->
+    case redbug:stop() of
+        not_started ->
+            ok;
+        stopped ->
+            timer:sleep(80),
+            ok
+    end.

+ 2 - 1
apps/emqx_message_transformation/src/emqx_message_transformation.app.src

@@ -6,7 +6,8 @@
     {applications, [
         kernel,
         stdlib,
-        emqx
+        emqx,
+        emqx_schema_registry
     ]},
     {env, []},
     {modules, []},

+ 182 - 109
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -26,20 +26,28 @@
     on_message_publish/1
 ]).
 
+%% Internal exports
+-export([run_transformation/2, trace_failure_context_to_map/1]).
+
 %%------------------------------------------------------------------------------
 %% Type declarations
 %%------------------------------------------------------------------------------
 
 -define(TRACE_TAG, "MESSAGE_TRANSFORMATION").
--define(CONF_ROOT, message_transformation).
--define(CONF_ROOT_BIN, <<"message_transformation">>).
--define(TRANSFORMATIONS_CONF_PATH, [?CONF_ROOT, transformations]).
+
+-record(trace_failure_context, {
+    transformation :: transformation(),
+    tag :: string(),
+    context :: map()
+}).
+-type trace_failure_context() :: #trace_failure_context{}.
 
 -type transformation_name() :: binary().
 %% TODO: make more specific typespec
 -type transformation() :: #{atom() => term()}.
 %% TODO: make more specific typespec
 -type variform() :: any().
+-type failure_action() :: ignore | drop | disconnect.
 -type operation() :: #{key := [binary(), ...], value := variform()}.
 -type qos() :: 0..2.
 -type rendered_value() :: qos() | boolean() | binary().
@@ -62,7 +70,8 @@
 
 -export_type([
     transformation/0,
-    transformation_name/0
+    transformation_name/0,
+    failure_action/0
 ]).
 
 %%------------------------------------------------------------------------------
@@ -125,19 +134,50 @@ on_message_publish(Message = #message{topic = Topic}) ->
 %% Internal exports
 %%------------------------------------------------------------------------------
 
+-spec run_transformation(transformation(), emqx_types:message()) ->
+    {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
+run_transformation(Transformation, MessageIn) ->
+    #{
+        operations := Operations,
+        failure_action := FailureAction,
+        payload_decoder := PayloadDecoder
+    } = Transformation,
+    Fun = fun(Operation, Acc) ->
+        case eval_operation(Operation, Transformation, Acc) of
+            {ok, NewAcc} -> {cont, NewAcc};
+            {error, TraceFailureContext} -> {halt, {error, TraceFailureContext}}
+        end
+    end,
+    PayloadIn = MessageIn#message.payload,
+    case decode(PayloadIn, PayloadDecoder, Transformation) of
+        {ok, InitPayload} ->
+            InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
+            case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
+                #{} = ContextOut ->
+                    context_to_message(MessageIn, ContextOut, Transformation);
+                {error, TraceFailureContext} ->
+                    {FailureAction, TraceFailureContext}
+            end;
+        {error, TraceFailureContext} ->
+            {FailureAction, TraceFailureContext}
+    end.
+
 %%------------------------------------------------------------------------------
 %% Internal functions
 %%------------------------------------------------------------------------------
 
--spec eval_operation(operation(), transformation(), eval_context()) -> {ok, eval_context()} | error.
+-spec eval_operation(operation(), transformation(), eval_context()) ->
+    {ok, eval_context()} | {error, trace_failure_context()}.
 eval_operation(Operation, Transformation, Context) ->
     #{key := K, value := V} = Operation,
     case eval_variform(K, V, Context) of
         {error, Reason} ->
-            trace_failure(Transformation, "transformation_eval_operation_failure", #{
-                reason => Reason
-            }),
-            error;
+            FailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "transformation_eval_operation_failure",
+                context = #{reason => Reason}
+            },
+            {error, FailureContext};
         {ok, Rendered} ->
             NewContext = put_value(K, Rendered, Context),
             {ok, NewContext}
@@ -233,14 +273,16 @@ do_run_transformations(Transformations, Message) ->
         #{name := Name} = Transformation,
         emqx_message_transformation_registry:inc_matched(Name),
         case run_transformation(Transformation, MessageAcc) of
-            #message{} = NewAcc ->
+            {ok, #message{} = NewAcc} ->
                 emqx_message_transformation_registry:inc_succeeded(Name),
                 {cont, NewAcc};
-            ignore ->
+            {ignore, TraceFailureContext} ->
+                trace_failure_from_context(TraceFailureContext),
                 emqx_message_transformation_registry:inc_failed(Name),
                 run_message_transformation_failed_hook(Message, Transformation),
                 {cont, MessageAcc};
-            FailureAction ->
+            {FailureAction, TraceFailureContext} ->
+                trace_failure_from_context(TraceFailureContext),
                 trace_failure(Transformation, "transformation_failed", #{
                     transformation => Name,
                     action => FailureAction
@@ -270,33 +312,6 @@ do_run_transformations(Transformations, Message) ->
             FailureAction
     end.
 
-run_transformation(Transformation, MessageIn) ->
-    #{
-        operations := Operations,
-        failure_action := FailureAction,
-        payload_decoder := PayloadDecoder
-    } = Transformation,
-    Fun = fun(Operation, Acc) ->
-        case eval_operation(Operation, Transformation, Acc) of
-            {ok, NewAcc} -> {cont, NewAcc};
-            error -> {halt, FailureAction}
-        end
-    end,
-    PayloadIn = MessageIn#message.payload,
-    case decode(PayloadIn, PayloadDecoder, Transformation) of
-        {ok, InitPayload} ->
-            InitAcc = message_to_context(MessageIn, InitPayload, Transformation),
-            case emqx_utils:foldl_while(Fun, InitAcc, Operations) of
-                #{} = ContextOut ->
-                    context_to_message(MessageIn, ContextOut, Transformation);
-                _ ->
-                    FailureAction
-            end;
-        error ->
-            %% Error already logged
-            FailureAction
-    end.
-
 -spec message_to_context(emqx_types:message(), _Payload, transformation()) -> eval_context().
 message_to_context(#message{} = Message, Payload, Transformation) ->
     #{
@@ -321,7 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
     }.
 
 -spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
-    {ok, emqx_types:message()} | _TODO.
+    {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}.
 context_to_message(Message, Context, Transformation) ->
     #{
         failure_action := FailureAction,
@@ -330,9 +345,9 @@ context_to_message(Message, Context, Transformation) ->
     #{payload := PayloadOut} = Context,
     case encode(PayloadOut, PayloadEncoder, Transformation) of
         {ok, Payload} ->
-            take_from_context(Context#{payload := Payload}, Message);
-        error ->
-            FailureAction
+            {ok, take_from_context(Context#{payload := Payload}, Message)};
+        {error, TraceFailureContext} ->
+            {FailureAction, TraceFailureContext}
     end.
 
 take_from_context(Context, Message) ->
@@ -362,31 +377,43 @@ decode(Payload, #{type := json}, Transformation) ->
         {ok, JSON} ->
             {ok, JSON};
         {error, Reason} ->
-            trace_failure(Transformation, "payload_decode_failed", #{
-                decoder => json,
-                reason => Reason
-            }),
-            error
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_decode_failed",
+                context = #{
+                    decoder => json,
+                    reason => Reason
+                }
+            },
+            {error, TraceFailureContext}
     end;
 decode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
     try
         {ok, emqx_schema_registry_serde:decode(SerdeName, Payload)}
     catch
         error:{serde_not_found, _} ->
-            trace_failure(Transformation, "payload_decode_schema_not_found", #{
-                decoder => avro,
-                schema_name => SerdeName
-            }),
-            error;
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_decode_schema_not_found",
+                context = #{
+                    decoder => avro,
+                    schema_name => SerdeName
+                }
+            },
+            {error, TraceFailureContext};
         Class:Error:Stacktrace ->
-            trace_failure(Transformation, "payload_decode_schema_failure", #{
-                decoder => avro,
-                schema_name => SerdeName,
-                kind => Class,
-                reason => Error,
-                stacktrace => Stacktrace
-            }),
-            error
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_decode_schema_failure",
+                context = #{
+                    decoder => avro,
+                    schema_name => SerdeName,
+                    kind => Class,
+                    reason => Error,
+                    stacktrace => Stacktrace
+                }
+            },
+            {error, TraceFailureContext}
     end;
 decode(
     Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
@@ -395,22 +422,30 @@ decode(
         {ok, emqx_schema_registry_serde:decode(SerdeName, Payload, [MessageType])}
     catch
         error:{serde_not_found, _} ->
-            trace_failure(Transformation, "payload_decode_schema_not_found", #{
-                decoder => protobuf,
-                schema_name => SerdeName,
-                message_type => MessageType
-            }),
-            error;
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_decode_schema_not_found",
+                context = #{
+                    decoder => protobuf,
+                    schema_name => SerdeName,
+                    message_type => MessageType
+                }
+            },
+            {error, TraceFailureContext};
         Class:Error:Stacktrace ->
-            trace_failure(Transformation, "payload_decode_schema_failure", #{
-                decoder => protobuf,
-                schema_name => SerdeName,
-                message_type => MessageType,
-                kind => Class,
-                reason => Error,
-                stacktrace => Stacktrace
-            }),
-            error
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_decode_schema_failure",
+                context = #{
+                    decoder => protobuf,
+                    schema_name => SerdeName,
+                    message_type => MessageType,
+                    kind => Class,
+                    reason => Error,
+                    stacktrace => Stacktrace
+                }
+            },
+            {error, TraceFailureContext}
     end.
 
 encode(Payload, #{type := none}, _Transformation) ->
@@ -420,31 +455,43 @@ encode(Payload, #{type := json}, Transformation) ->
         {ok, Bin} ->
             {ok, Bin};
         {error, Reason} ->
-            trace_failure(Transformation, "payload_encode_failed", #{
-                encoder => json,
-                reason => Reason
-            }),
-            error
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_encode_failed",
+                context = #{
+                    encoder => json,
+                    reason => Reason
+                }
+            },
+            {error, TraceFailureContext}
     end;
 encode(Payload, #{type := avro, schema := SerdeName}, Transformation) ->
     try
         {ok, emqx_schema_registry_serde:encode(SerdeName, Payload)}
     catch
         error:{serde_not_found, _} ->
-            trace_failure(Transformation, "payload_encode_schema_not_found", #{
-                encoder => avro,
-                schema_name => SerdeName
-            }),
-            error;
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_encode_schema_not_found",
+                context = #{
+                    encoder => avro,
+                    schema_name => SerdeName
+                }
+            },
+            {error, TraceFailureContext};
         Class:Error:Stacktrace ->
-            trace_failure(Transformation, "payload_encode_schema_failure", #{
-                encoder => avro,
-                schema_name => SerdeName,
-                kind => Class,
-                reason => Error,
-                stacktrace => Stacktrace
-            }),
-            error
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_encode_schema_failure",
+                context = #{
+                    encoder => avro,
+                    schema_name => SerdeName,
+                    kind => Class,
+                    reason => Error,
+                    stacktrace => Stacktrace
+                }
+            },
+            {error, TraceFailureContext}
     end;
 encode(
     Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation
@@ -453,24 +500,50 @@ encode(
         {ok, emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageType])}
     catch
         error:{serde_not_found, _} ->
-            trace_failure(Transformation, "payload_encode_schema_not_found", #{
-                encoder => protobuf,
-                schema_name => SerdeName,
-                message_type => MessageType
-            }),
-            error;
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_encode_schema_failure",
+                context = #{
+                    encoder => protobuf,
+                    schema_name => SerdeName,
+                    message_type => MessageType
+                }
+            },
+            {error, TraceFailureContext};
         Class:Error:Stacktrace ->
-            trace_failure(Transformation, "payload_encode_schema_failure", #{
-                encoder => protobuf,
-                schema_name => SerdeName,
-                message_type => MessageType,
-                kind => Class,
-                reason => Error,
-                stacktrace => Stacktrace
-            }),
-            error
+            TraceFailureContext = #trace_failure_context{
+                transformation = Transformation,
+                tag = "payload_encode_schema_failure",
+                context = #{
+                    encoder => protobuf,
+                    schema_name => SerdeName,
+                    message_type => MessageType,
+                    kind => Class,
+                    reason => Error,
+                    stacktrace => Stacktrace
+                }
+            },
+            {error, TraceFailureContext}
     end.
 
+trace_failure_from_context(
+    #trace_failure_context{
+        transformation = Transformation,
+        tag = Tag,
+        context = Context
+    }
+) ->
+    trace_failure(Transformation, Tag, Context).
+
+%% Internal export for HTTP API.
+trace_failure_context_to_map(
+    #trace_failure_context{
+        tag = Tag,
+        context = Context
+    }
+) ->
+    Context#{msg => list_to_binary(Tag)}.
+
 trace_failure(#{log_failure := #{level := none}} = Transformation, _Msg, _Meta) ->
     #{
         name := _Name,

+ 147 - 0
apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl

@@ -8,6 +8,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("emqx_utils/include/emqx_utils_api.hrl").
 
 %% `minirest' and `minirest_trails' API
@@ -23,6 +24,7 @@
 -export([
     '/message_transformations'/2,
     '/message_transformations/reorder'/2,
+    '/message_transformations/dryrun'/2,
     '/message_transformations/transformation/:name'/2,
     '/message_transformations/transformation/:name/metrics'/2,
     '/message_transformations/transformation/:name/metrics/reset'/2,
@@ -36,6 +38,9 @@
 -define(TAGS, [<<"Message Transformation">>]).
 -define(METRIC_NAME, message_transformation).
 
+-type user_property() :: #{binary() => binary()}.
+-reflect_type([user_property/0]).
+
 %%-------------------------------------------------------------------------------------------------
 %% `minirest' and `minirest_trails' API
 %%-------------------------------------------------------------------------------------------------
@@ -49,6 +54,7 @@ paths() ->
     [
         "/message_transformations",
         "/message_transformations/reorder",
+        "/message_transformations/dryrun",
         "/message_transformations/transformation/:name",
         "/message_transformations/transformation/:name/metrics",
         "/message_transformations/transformation/:name/metrics/reset",
@@ -143,6 +149,25 @@ schema("/message_transformations/reorder") ->
                 }
         }
     };
+schema("/message_transformations/dryrun") ->
+    #{
+        'operationId' => '/message_transformations/dryrun',
+        post => #{
+            tags => ?TAGS,
+            summary => <<"Test an input against a configuration">>,
+            description => ?DESC("dryrun_transformation"),
+            'requestBody' =>
+                emqx_dashboard_swagger:schema_with_examples(
+                    ref(dryrun_transformation),
+                    example_input_dryrun_transformation()
+                ),
+            responses =>
+                #{
+                    200 => <<"TODO">>,
+                    400 => error_schema('BAD_REQUEST', <<"Bad request">>)
+                }
+        }
+    };
 schema("/message_transformations/transformation/:name") ->
     #{
         'operationId' => '/message_transformations/transformation/:name',
@@ -267,6 +292,29 @@ fields(reorder) ->
     [
         {order, mk(array(binary()), #{required => true, in => body})}
     ];
+fields(dryrun_transformation) ->
+    [
+        {transformation,
+            mk(
+                hoconsc:ref(emqx_message_transformation_schema, transformation),
+                #{required => true, in => body}
+            )},
+        {message, mk(ref(dryrun_input_message), #{required => true, in => body})}
+    ];
+fields(dryrun_input_message) ->
+    %% See `emqx_message_transformation:eval_context()'.
+    [
+        {client_attrs, mk(map(), #{default => #{}})},
+        {payload, mk(binary(), #{required => true})},
+        {qos, mk(range(0, 2), #{default => 0})},
+        {retain, mk(boolean(), #{default => false})},
+        {topic, mk(binary(), #{required => true})},
+        {user_property,
+            mk(
+                typerefl:alias("map(binary(), binary())", user_property()),
+                #{default => #{}}
+            )}
+    ];
 fields(get_metrics) ->
     [
         {metrics, mk(ref(metrics), #{})},
@@ -343,6 +391,9 @@ fields(node_metrics) ->
 '/message_transformations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
     do_reorder(Order).
 
+'/message_transformations/dryrun'(post, #{body := Params}) ->
+    do_transformation_dryrun(Params).
+
 '/message_transformations/transformation/:name/enable/:enable'(post, #{
     bindings := #{name := Name, enable := Enable}
 }) ->
@@ -436,6 +487,17 @@ example_input_reorder() ->
             }
     }.
 
+example_input_dryrun_transformation() ->
+    #{
+        <<"test">> =>
+            #{
+                summary => <<"Test an input against a configuration">>,
+                value => #{
+                    todo => true
+                }
+            }
+    }.
+
 example_return_list() ->
     OtherVal0 = example_transformation([example_avro_check()]),
     OtherVal = OtherVal0#{name => <<"other_transformation">>},
@@ -541,6 +603,20 @@ do_reorder(Order) ->
             ?BAD_REQUEST(Error)
     end.
 
+do_transformation_dryrun(Params) ->
+    #{
+        transformation := Transformation,
+        message := Message
+    } = dryrun_input_message_in(Params),
+    case emqx_message_transformation:run_transformation(Transformation, Message) of
+        {ok, #message{} = FinalMessage} ->
+            MessageOut = dryrun_input_message_out(FinalMessage),
+            ?OK(MessageOut);
+        {_FailureAction, TraceFailureContext} ->
+            Result = trace_failure_context_out(TraceFailureContext),
+            {400, Result}
+    end.
+
 do_enable_disable(Transformation, Enable) ->
     RawTransformation = make_serializable(Transformation),
     case emqx_message_transformation:update(RawTransformation#{<<"enable">> => Enable}) of
@@ -654,3 +730,74 @@ operation_out(Operation0) ->
         fun(Path) -> iolist_to_binary(lists:join(".", Path)) end,
         Operation
     ).
+
+dryrun_input_message_in(Params) ->
+    %% We already check the params against the schema at the API boundary, so we can
+    %% expect it to succeed here.
+    #{root := Result = #{message := Message0}} =
+        hocon_tconf:check_plain(
+            #{roots => [{root, ref(dryrun_transformation)}]},
+            #{<<"root">> => Params},
+            #{atom_key => true}
+        ),
+    #{
+        client_attrs := ClientAttrs,
+        payload := Payload,
+        qos := QoS,
+        retain := Retain,
+        topic := Topic,
+        user_property := UserProperty0
+    } = Message0,
+    UserProperty = maps:to_list(UserProperty0),
+    Message1 = #{
+        id => emqx_guid:gen(),
+        timestamp => emqx_message:timestamp_now(),
+        extra => #{},
+        from => <<"test-clientid">>,
+
+        flags => #{retain => Retain},
+        qos => QoS,
+        topic => Topic,
+        payload => Payload,
+        headers => #{
+            client_attrs => ClientAttrs,
+            properties => #{'User-Property' => UserProperty}
+        }
+    },
+    Message = emqx_message:from_map(Message1),
+    Result#{message := Message}.
+
+dryrun_input_message_out(#message{} = Message) ->
+    Retain = emqx_message:get_flag(retain, Message, false),
+    Props = emqx_message:get_header(properties, Message, #{}),
+    UserProperty0 = maps:get('User-Property', Props, []),
+    UserProperty = maps:from_list(UserProperty0),
+    MessageOut0 = emqx_message:to_map(Message),
+    MessageOut = maps:with([payload, qos, topic], MessageOut0),
+    MessageOut#{
+        retain => Retain,
+        user_property => UserProperty
+    }.
+
+trace_failure_context_out(TraceFailureContext) ->
+    Context0 = emqx_message_transformation:trace_failure_context_to_map(TraceFailureContext),
+    %% Some context keys may not be JSON-encodable.
+    maps:filtermap(
+        fun
+            (reason, Reason) ->
+                case emqx_utils_json:safe_encode(Reason) of
+                    {ok, _} ->
+                        %% Let minirest encode it if it's structured.
+                        true;
+                    {error, _} ->
+                        %% "Best effort"
+                        {true, iolist_to_binary(io_lib:format("~p", [Reason]))}
+                end;
+            (stacktrace, _Stacktrace) ->
+                %% Log?
+                false;
+            (_Key, _Value) ->
+                true
+        end,
+        Context0
+    ).

+ 122 - 0
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -140,6 +140,31 @@ topic_operation(VariformExpr) ->
 operation(Key, VariformExpr) ->
     {Key, VariformExpr}.
 
+json_serde() ->
+    #{<<"type">> => <<"json">>}.
+
+avro_serde(SerdeName) ->
+    #{<<"type">> => <<"avro">>, <<"schema">> => SerdeName}.
+
+dryrun_input_message() ->
+    dryrun_input_message(_Overrides = #{}).
+
+dryrun_input_message(Overrides) ->
+    dryrun_input_message(Overrides, _Opts = #{}).
+
+dryrun_input_message(Overrides, Opts) ->
+    Encoder = maps:get(encoder, Opts, fun emqx_utils_json:encode/1),
+    Defaults = #{
+        client_attrs => #{},
+        payload => #{},
+        qos => 2,
+        retain => true,
+        topic => <<"t/u/v">>,
+        user_property => #{}
+    },
+    InputMessage0 = emqx_utils_maps:deep_merge(Defaults, Overrides),
+    maps:update_with(payload, Encoder, InputMessage0).
+
 api_root() -> "message_transformations".
 
 simplify_result(Res) ->
@@ -246,6 +271,13 @@ import_backup(BackupName) ->
     Res = request(post, Path, Body),
     simplify_result(Res).
 
+dryrun_transformation(Transformation, Message) ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root(), "dryrun"]),
+    Params = #{transformation => Transformation, message => Message},
+    Res = request(post, Path, Params),
+    ct:pal("dryrun transformation result:\n  ~p", [Res]),
+    simplify_result(Res).
+
 connect(ClientId) ->
     connect(ClientId, _IsPersistent = false).
 
@@ -1491,3 +1523,93 @@ t_client_attrs(_Config) ->
         []
     ),
     ok.
+
+%% Smoke tests for the dryrun endpoint.
+t_dryrun_transformation(_Config) ->
+    ?check_trace(
+        begin
+            Name1 = <<"foo">>,
+            Operations = [
+                operation(qos, <<"payload.q">>),
+                operation(topic, <<"concat([topic, '/', payload.t])">>),
+                operation(retain, <<"payload.r">>),
+                operation(<<"user_property.a">>, <<"payload.u.a">>),
+                operation(<<"payload">>, <<"payload.p.hello">>)
+            ],
+            Transformation1 = transformation(Name1, Operations),
+
+            %% Good input
+            Message1 = dryrun_input_message(#{
+                payload => #{
+                    p => #{<<"hello">> => <<"world">>},
+                    q => 1,
+                    r => true,
+                    t => <<"t">>,
+                    u => #{a => <<"b">>}
+                }
+            }),
+            ?assertMatch(
+                {200, #{
+                    <<"payload">> := <<"\"world\"">>,
+                    <<"qos">> := 1,
+                    <<"retain">> := true,
+                    <<"topic">> := <<"t/u/v/t">>,
+                    <<"user_property">> := #{<<"a">> := <<"b">>}
+                }},
+                dryrun_transformation(Transformation1, Message1)
+            ),
+
+            %% Bad input: fails to decode
+            Message2 = dryrun_input_message(#{payload => "{"}, #{encoder => fun(X) -> X end}),
+            ?assertMatch(
+                {400, #{
+                    <<"decoder">> := <<"json">>,
+                    <<"reason">> := <<_/binary>>
+                }},
+                dryrun_transformation(Transformation1, Message2)
+            ),
+
+            %% Bad output: fails to encode
+            MissingSerde = <<"missing_serde">>,
+            Transformation2 = transformation(Name1, [dummy_operation()], #{
+                <<"payload_decoder">> => json_serde(),
+                <<"payload_encoder">> => avro_serde(MissingSerde)
+            }),
+            ?assertMatch(
+                {400, #{
+                    <<"msg">> := <<"payload_encode_schema_not_found">>,
+                    <<"encoder">> := <<"avro">>,
+                    <<"schema_name">> := MissingSerde
+                }},
+                dryrun_transformation(Transformation2, Message1)
+            ),
+
+            %% Bad input: unbound var during one of the operations
+            Message3 = dryrun_input_message(#{
+                payload => #{
+                    p => #{<<"hello">> => <<"world">>},
+                    q => 1,
+                    %% Missing:
+                    %% r => true,
+                    t => <<"t">>,
+                    u => #{a => <<"b">>}
+                }
+            }),
+            ?assertMatch(
+                {400, #{
+                    <<"msg">> :=
+                        <<"transformation_eval_operation_failure">>,
+                    <<"reason">> :=
+                        #{
+                            <<"reason">> := <<"var_unbound">>,
+                            <<"var_name">> := <<"payload.r">>
+                        }
+                }},
+                dryrun_transformation(Transformation1, Message3)
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 73 - 50
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -59,6 +59,9 @@
     default_port => ?PGSQL_DEFAULT_PORT
 }).
 
+-type connector_resource_id() :: binary().
+-type action_resource_id() :: binary().
+
 -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
 -type state() ::
     #{
@@ -319,38 +322,40 @@ do_check_channel_sql(
 on_get_channels(ResId) ->
     emqx_bridge_v2:get_channels_for_connector(ResId).
 
-on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
-    on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
+-spec on_query
+    %% Called from authn and authz modules
+    (connector_resource_id(), {prepared_query, binary(), [term()]}, state()) ->
+        {ok, _} | {error, term()};
+    %% Called from bridges
+    (connector_resource_id(), {action_resource_id(), map()}, state()) ->
+        {ok, _} | {error, term()}.
+on_query(InstId, {TypeOrKey, NameOrMap}, State) ->
+    on_query(InstId, {TypeOrKey, NameOrMap, []}, State);
 on_query(
     InstId,
-    {TypeOrKey, NameOrSQL, Params},
+    {TypeOrKey, NameOrMap, Params},
     #{pool_name := PoolName} = State
 ) ->
     ?SLOG(debug, #{
         msg => "postgresql_connector_received_sql_query",
         connector => InstId,
         type => TypeOrKey,
-        sql => NameOrSQL,
+        sql => NameOrMap,
         state => State
     }),
-    Type = pgsql_query_type(TypeOrKey, State),
-    {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
-    Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
+    {QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State),
+    emqx_trace:rendered_action_template(
+        TypeOrKey,
+        #{
+            statement_type => QueryType,
+            statement_or_name => NameOrSQL2,
+            data => Data
+        }
+    ),
+    Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data),
     ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
     handle_result(Res).
 
-pgsql_query_type(_TypeOrTag, #{prepares := disabled}) ->
-    query;
-pgsql_query_type(sql, _ConnectorState) ->
-    query;
-pgsql_query_type(query, _ConnectorState) ->
-    query;
-pgsql_query_type(prepared_query, _ConnectorState) ->
-    prepared_query;
-%% for bridge
-pgsql_query_type(_, ConnectorState) ->
-    pgsql_query_type(prepared_query, ConnectorState).
-
 on_batch_query(
     InstId,
     [{Key, _} = Request | _] = BatchReq,
@@ -370,7 +375,15 @@ on_batch_query(
         {_Statement, RowTemplate} ->
             StatementTemplate = get_templated_statement(BinKey, State),
             Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
-            case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of
+            emqx_trace:rendered_action_template(
+                Key,
+                #{
+                    statement_type => execute_batch,
+                    statement_or_name => StatementTemplate,
+                    data => Rows
+                }
+            ),
+            case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of
                 {error, _Error} = Result ->
                     handle_result(Result);
                 {_Column, Results} ->
@@ -386,25 +399,38 @@ on_batch_query(InstId, BatchReq, State) ->
     }),
     {error, {unrecoverable_error, invalid_request}}.
 
-proc_sql_params(query, SQLOrKey, Params, _State) ->
-    {SQLOrKey, Params};
-proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
-    {SQLOrKey, Params};
-proc_sql_params(TypeOrKey, SQLOrData, Params, State) ->
-    DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled,
-    BinKey = to_bin(TypeOrKey),
-    case get_template(BinKey, State) of
-        undefined ->
-            {SQLOrData, Params};
-        {Statement, RowTemplate} ->
-            Rendered = render_prepare_sql_row(RowTemplate, SQLOrData),
-            case DisablePreparedStatements of
-                true ->
-                    {Statement, Rendered};
-                false ->
-                    {BinKey, Rendered}
-            end
-    end.
+proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) ->
+    %% When this connector is called from actions/bridges.
+    DisablePreparedStatements = prepared_statements_disabled(State),
+    {ExprTemplate, RowTemplate} = get_template(ActionResId, State),
+    Rendered = render_prepare_sql_row(RowTemplate, Map),
+    case DisablePreparedStatements of
+        true ->
+            {query, ExprTemplate, Rendered};
+        false ->
+            {prepared_query, ActionResId, Rendered}
+    end;
+proc_sql_params(prepared_query, ConnResId, Params, State) ->
+    %% When this connector is called from authn/authz modules
+    DisablePreparedStatements = prepared_statements_disabled(State),
+    case DisablePreparedStatements of
+        true ->
+            #{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State,
+            {query, ExprTemplate, Params};
+        false ->
+            %% Connector resource id itself is the prepared statement name
+            {prepared_query, ConnResId, Params}
+    end;
+proc_sql_params(QueryType, SQL, Params, _State) when
+    is_atom(QueryType) andalso
+        (is_binary(SQL) orelse is_list(SQL)) andalso
+        is_list(Params)
+->
+    %% When called to do ad-hoc commands/queries.
+    {QueryType, SQL, Params}.
+
+prepared_statements_disabled(State) ->
+    maps:get(prepares, State, #{}) =:= disabled.
 
 get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) ->
     BinKey = to_bin(Key),
@@ -420,21 +446,17 @@ get_templated_statement(Key, #{installed_channels := Channels} = _State) when
 ->
     BinKey = to_bin(Key),
     ChannelState = maps:get(BinKey, Channels),
-    ChannelPreparedStatements = maps:get(prepares, ChannelState),
-    maps:get(BinKey, ChannelPreparedStatements);
+    case ChannelState of
+        #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
+            ExprTemplate;
+        #{prepares := #{BinKey := ExprTemplate}} ->
+            ExprTemplate
+    end;
 get_templated_statement(Key, #{prepares := PrepStatements}) ->
     BinKey = to_bin(Key),
     maps:get(BinKey, PrepStatements).
 
-on_sql_query(Key, InstId, PoolName, Type, NameOrSQL, Data) ->
-    emqx_trace:rendered_action_template(
-        Key,
-        #{
-            statement_type => Type,
-            statement_or_name => NameOrSQL,
-            data => Data
-        }
-    ),
+on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
     try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
         {error, Reason} ->
             ?tp(
@@ -785,6 +807,7 @@ handle_batch_result([{error, Error} | _Rest], _Acc) ->
     TranslatedError = translate_to_log_context(Error),
     {error, {unrecoverable_error, export_error(TranslatedError)}};
 handle_batch_result([], Acc) ->
+    ?tp("postgres_success_batch_result", #{row_count => Acc}),
     {ok, Acc}.
 
 translate_to_log_context({error, Reason}) ->

+ 1 - 1
apps/emqx_utils/src/emqx_variform.erl

@@ -276,7 +276,7 @@ resolve_var_value(VarName, Bindings, _Opts) ->
             Value;
         {error, _Reason} ->
             throw(#{
-                var_name => VarName,
+                var_name => iolist_to_binary(VarName),
                 reason => var_unbound
             })
     end.

+ 22 - 0
changes/ce/feat-13191.en.md

@@ -0,0 +1,22 @@
+Upgrade EMQX Docker images to run on Erlang/OTP 26.
+
+EMQX had been running on Erlang/OTP 26 since 5.5 except for docker images which were on Erlang/OTP 25.
+Now all releases are on Erlang/OTP 26.
+
+A known issue:
+When an older version EMQX joins cluster with newer version nodes.
+The older version node's schema registry may encounter an issue which emits logs like below:
+
+```
+Error loading module '$schema_parser___CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN':,
+This BEAM file was compiled for a later version of the runtime system than the current (Erlang/OTP 25).
+```
+
+This issue is fixed in newer version, however for older versions, a manual step is required.
+Execute this in one of the clustered nodes before the older version EMQX joins the cluster.
+
+```shell
+emqx eval 'lists:foreach(fun(Key) -> mnesia:dirty_delete(emqx_ee_schema_registry_protobuf_cache_tab, Key) end, mnesia:dirty_all_keys(emqx_ee_schema_registry_protobuf_cache_tab)).'
+```
+
+Or if the older version EMQX is already in the cluster, execute the above command, and restart this node.

+ 1 - 0
changes/ce/fix-13276.en.md

@@ -0,0 +1 @@
+Fix an issue with durable message storage where parts of the internal storage state were not persisted during setup of new storage generation, a concept used internally for managing message expiration and cleanup. This could have manifested as messages being lost after a restart of the broker.

changes/ee/fix-13079.en.md → changes/ee/fix-13277.en.md


+ 1 - 1
mix.exs

@@ -214,7 +214,7 @@ defmodule EMQXUmbrella.MixProject do
       {:hstreamdb_erl,
        github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.10.5"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

+ 3 - 0
rel/i18n/emqx_message_transformation_http_api.hocon

@@ -18,6 +18,9 @@ emqx_message_transformation_http_api {
   reorder_transformations.desc:
   """Reorder of all transformations"""
 
+  dryrun_transformation.desc:
+  """Test an input against a transformation"""
+
   enable_disable_transformation.desc:
   """Enable or disable a particular transformation"""