Quellcode durchsuchen

fix(rule_maps): avoid losing data when using `emqx_rule_maps:nested_put`

Fixes https://emqx.atlassian.net/browse/EMQX-10541
Thales Macedo Garitezi vor 2 Jahren
Ursprung
Commit
bffef386c1

+ 13 - 3
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -10,6 +10,8 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 %% ct setup helpers
 
 init_per_suite(Config, Apps) ->
@@ -211,19 +213,27 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
     Res.
 
 create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
+    create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
+
+create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
     BridgeName = ?config(bridge_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
+    SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
     Params = #{
         enable => true,
-        sql => <<"SELECT * FROM \"", RuleTopic/binary, "\"">>,
+        sql => SQL,
         actions => [BridgeId]
     },
     Path = emqx_mgmt_api_test_util:api_path(["rules"]),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
     ct:pal("rule action params: ~p", [Params]),
     case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
-        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
-        Error -> Error
+        {ok, Res0} ->
+            Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
+            on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
+            {ok, Res};
+        Error ->
+            Error
     end.
 
 %%------------------------------------------------------------------------------

+ 35 - 0
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -379,6 +379,41 @@ t_sync_device_id_missing(Config) ->
         iotdb_bridge_on_query
     ).
 
+t_extract_device_id_from_rule_engine_message(Config) ->
+    BridgeType = ?config(bridge_type, Config),
+    RuleTopic = <<"t/iotdb">>,
+    DeviceId = iotdb_device(Config),
+    Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"),
+    Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
+    ?check_trace(
+        begin
+            {ok, _} = emqx_bridge_testlib:create_bridge(Config),
+            SQL = <<
+                "SELECT\n"
+                "  payload.measurement, payload.data_type, payload.value, payload.device_id\n"
+                "FROM\n"
+                "  \"",
+                RuleTopic/binary,
+                "\""
+            >>,
+            Opts = #{sql => SQL},
+            {ok, _} = emqx_bridge_testlib:create_rule_and_action_http(
+                BridgeType, RuleTopic, Config, Opts
+            ),
+            emqx:publish(Message),
+            ?block_until(handle_async_reply, 5_000),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertMatch(
+                [#{action := ack, result := {ok, 200, _, _}}],
+                ?of_kind(handle_async_reply, Trace)
+            ),
+            ok
+        end
+    ),
+    ok.
+
 t_sync_invalid_data(Config) ->
     emqx_bridge_testlib:t_sync_query(
         Config,

+ 9 - 0
apps/emqx_rule_engine/src/emqx_rule_maps.erl

@@ -129,6 +129,15 @@ general_find({index, _}, List, _OrgData, Handler) when not is_list(List) ->
 
 do_put({key, Key}, Val, Map, _OrgData) when is_map(Map) ->
     maps:put(Key, Val, Map);
+do_put({key, Key}, Val, Data, _OrgData) when is_binary(Data) ->
+    case emqx_utils_json:safe_decode(Data, [return_maps]) of
+        {ok, Map = #{}} ->
+            %% Avoid losing other keys when the data is an encoded map...
+            Map#{Key => Val};
+        _ ->
+            %% Fallback to the general case otherwise.
+            #{Key => Val}
+    end;
 do_put({key, Key}, Val, Data, _OrgData) when not is_map(Data) ->
     #{Key => Val};
 do_put({index, {const, Index}}, Val, List, _OrgData) ->

+ 19 - 1
apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl

@@ -71,7 +71,25 @@ t_nested_put_map(_) ->
     ?assertEqual(
         #{k => #{<<"t">> => #{<<"a">> => v1}}},
         nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}})
-    ).
+    ),
+    %% since we currently support passing a binary-encoded json as input...
+    ?assertEqual(
+        #{payload => #{<<"a">> => v1, <<"b">> => <<"v2">>}},
+        nested_put(
+            ?path([payload, <<"a">>]),
+            v1,
+            #{payload => emqx_utils_json:encode(#{b => <<"v2">>})}
+        )
+    ),
+    ?assertEqual(
+        #{payload => #{<<"a">> => #{<<"old">> => <<"v2">>, <<"new">> => v1}}},
+        nested_put(
+            ?path([payload, <<"a">>, <<"new">>]),
+            v1,
+            #{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})}
+        )
+    ),
+    ok.
 
 t_nested_put_index(_) ->
     ?assertEqual([1, a, 3], nested_put(?path([{ic, 2}]), a, [1, 2, 3])),