Parcourir la source

fix: IoTDB bridge incoming payload needs to be parsed as JSON

There was an incorrect assumption that the data incoming to the IoTDB
bridge has already been parsed. This is fixed by parsing the payload as
JSON data if the payload is not already a map.

Fixes:
https://emqx.atlassian.net/browse/EMQX-9854
Kjell Winblad il y a 2 ans
Parent
commit
d19ddb1832

+ 1 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 %% -*- mode: erlang -*-
 {application, emqx_bridge_iotdb, [
 {application, emqx_bridge_iotdb, [
     {description, "EMQX Enterprise Apache IoTDB Bridge"},
     {description, "EMQX Enterprise Apache IoTDB Bridge"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {modules, [
     {modules, [
         emqx_bridge_iotdb,
         emqx_bridge_iotdb,
         emqx_bridge_iotdb_impl
         emqx_bridge_iotdb_impl

+ 42 - 19
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl

@@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
 %% Internal Functions
 %% Internal Functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-preproc_data(DataList) ->
+make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
+    emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
+make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
+    lists:map(fun make_parsed_payload/1, PayloadUnparsed);
+make_parsed_payload(
+    #{
+        measurement := Measurement,
+        data_type := DataType,
+        value := Value
+    } = Data
+) ->
+    Data#{
+        <<"measurement">> => Measurement,
+        <<"data_type">> => DataType,
+        <<"value">> => Value
+    }.
+
+preproc_data(
+    #{
+        <<"measurement">> := Measurement,
+        <<"data_type">> := DataType,
+        <<"value">> := Value
+    } = Data
+) ->
+    #{
+        timestamp => emqx_plugin_libs_rule:preproc_tmpl(
+            maps:get(<<"timestamp">>, Data, <<"now">>)
+        ),
+        measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
+        data_type => DataType,
+        value => emqx_plugin_libs_rule:preproc_tmpl(Value)
+    }.
+
+preproc_data_list(DataList) ->
     lists:map(
     lists:map(
-        fun(
-            #{
-                measurement := Measurement,
-                data_type := DataType,
-                value := Value
-            } = Data
-        ) ->
-            #{
-                timestamp => emqx_plugin_libs_rule:preproc_tmpl(
-                    maps:get(<<"timestamp">>, Data, <<"now">>)
-                ),
-                measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
-                data_type => DataType,
-                value => emqx_plugin_libs_rule:preproc_tmpl(Value)
-            }
-        end,
+        fun preproc_data/1,
         DataList
         DataList
     ).
     ).
 
 
@@ -258,12 +276,15 @@ convert_float(Str) when is_binary(Str) ->
 convert_float(undefined) ->
 convert_float(undefined) ->
     null.
     null.
 
 
-make_iotdb_insert_request(Message, State) ->
+make_iotdb_insert_request(MessageUnparsedPayload, State) ->
+    PayloadUnparsed = maps:get(payload, MessageUnparsedPayload),
+    PayloadParsed = make_parsed_payload(PayloadUnparsed),
+    Message = MessageUnparsedPayload#{payload => PayloadParsed},
     IsAligned = maps:get(is_aligned, State, false),
     IsAligned = maps:get(is_aligned, State, false),
     DeviceId = device_id(Message, State),
     DeviceId = device_id(Message, State),
     IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X),
     IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X),
     Payload = make_list(maps:get(payload, Message)),
     Payload = make_list(maps:get(payload, Message)),
-    PreProcessedData = preproc_data(Payload),
+    PreProcessedData = preproc_data_list(Payload),
     DataList = proc_data(PreProcessedData, Message),
     DataList = proc_data(PreProcessedData, Message),
     InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
     InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
     Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
     Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
@@ -350,6 +371,8 @@ device_id(Message, State) ->
     case maps:get(device_id, State, undefined) of
     case maps:get(device_id, State, undefined) of
         undefined ->
         undefined ->
             case maps:get(payload, Message) of
             case maps:get(payload, Message) of
+                #{<<"device_id">> := DeviceId} ->
+                    DeviceId;
                 #{device_id := DeviceId} ->
                 #{device_id := DeviceId} ->
                     DeviceId;
                     DeviceId;
                 _NotFound ->
                 _NotFound ->