Просмотр исходного кода

fix(placeholder): add back support for nested keys inside binary encoded json

Fixes https://emqx.atlassian.net/browse/EMQX-10459
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
631f4ceac9

+ 71 - 3
apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl

@@ -32,6 +32,8 @@
 
 -define(WORKER_POOL_SIZE, 4).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %%------------------------------------------------------------------------------
@@ -106,7 +108,7 @@ init_per_suite(Config) ->
 
 end_per_suite(_Config) ->
     emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+    ok = emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_conf]),
     ok.
 
 init_per_testcase(_Testcase, Config) ->
@@ -123,6 +125,7 @@ end_per_testcase(_Testcase, Config) ->
     connect_and_clear_table(Config),
     ok = snabbkaffe:stop(),
     delete_bridge(Config),
+    emqx_common_test_helpers:call_janitor(),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -142,7 +145,7 @@ common_init(Config0) ->
             % Ensure EE bridge module is loaded
             _ = application:load(emqx_ee_bridge),
             _ = emqx_ee_bridge:module_info(),
-            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, emqx_rule_engine]),
             emqx_mgmt_api_test_util:init_suite(),
             % Connect to mysql directly and create the table
             connect_and_create_table(Config0),
@@ -212,9 +215,13 @@ parse_and_check(ConfigString, BridgeType, Name) ->
     Config.
 
 create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
     BridgeType = ?config(mysql_bridge_type, Config),
     Name = ?config(mysql_name, Config),
-    MysqlConfig = ?config(mysql_config, Config),
+    MysqlConfig0 = ?config(mysql_config, Config),
+    MysqlConfig = emqx_utils_maps:deep_merge(MysqlConfig0, Overrides),
     emqx_bridge:create(BridgeType, Name, MysqlConfig).
 
 delete_bridge(Config) ->
@@ -323,6 +330,26 @@ connect_and_clear_table(Config) ->
 connect_and_get_payload(Config) ->
     query_direct_mysql(Config, ?SQL_SELECT).
 
+create_rule_and_action_http(Config) ->
+    Name = ?config(mysql_name, Config),
+    Type = ?config(mysql_bridge_type, Config),
+    BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+    Params = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"t/topic\"">>,
+        actions => [BridgeId]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res0} ->
+            Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
+            on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+            {ok, Res};
+        Error ->
+            Error
+    end.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -776,3 +803,44 @@ t_table_removed(Config) ->
         []
     ),
     ok.
+
+t_nested_payload_template(Config) ->
+    Name = ?config(mysql_name, Config),
+    BridgeType = ?config(mysql_bridge_type, Config),
+    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
+    Value = integer_to_binary(erlang:unique_integer()),
+    ?check_trace(
+        begin
+            connect_and_create_table(Config),
+            {ok, _} = create_bridge(
+                Config,
+                #{
+                    <<"sql">> =>
+                        "INSERT INTO mqtt_test(payload, arrived) "
+                        "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))"
+                }
+            ),
+            {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
+            ),
+            %% send message via rule action
+            Payload = emqx_utils_json:encode(#{value => Value}),
+            Message = emqx_message:make(Topic, Payload),
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    emqx:publish(Message),
+                    #{?snk_kind := mysql_connector_query_return},
+                    10_000
+                ),
+            ?assertEqual(
+                {ok, [<<"payload">>], [[Value]]},
+                connect_and_get_payload(Config)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 8 - 1
apps/emqx_utils/src/emqx_placeholder.erl

@@ -257,7 +257,14 @@ quote_mysql(Str) ->
 
 lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] ->
     Value;
-lookup_var([Prop | Rest], Data) ->
+lookup_var([Prop | Rest], Data0) ->
+    Data =
+        case emqx_utils_json:safe_decode(Data0, [return_maps]) of
+            {ok, Data1} ->
+                Data1;
+            {error, _} ->
+                Data0
+        end,
     case lookup(Prop, Data) of
         {ok, Value} ->
             lookup_var(Rest, Value);

+ 9 - 0
apps/emqx_utils/test/emqx_placeholder_SUITE.erl

@@ -39,6 +39,15 @@ t_proc_tmpl_path(_) ->
         emqx_placeholder:proc_tmpl(Tks, Selected)
     ).
 
+t_proc_tmpl_path_encoded_json(_) ->
+    %% when we receive a message from the rule engine, it is a map with an encoded payload
+    Selected = #{payload => emqx_utils_json:encode(#{d1 => #{d2 => <<"hi">>}})},
+    Tks = emqx_placeholder:preproc_tmpl(<<"payload.d1.d2:${payload.d1.d2}">>),
+    ?assertEqual(
+        <<"payload.d1.d2:hi">>,
+        emqx_placeholder:proc_tmpl(Tks, Selected)
+    ).
+
 t_proc_tmpl_custom_ph(_) ->
     Selected = #{a => <<"a">>, b => <<"b">>},
     Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b}">>, #{placeholders => [<<"${a}">>]}),

+ 1 - 0
changes/ce/fix-11164.en.md

@@ -0,0 +1 @@
+Reintroduced support for nested (i.e.: `${payload.a.b.c}`) placeholders for extracting data from rule action messages without the need for calling `json_decode(payload)` first.