Ver código fonte

fix(iotdb): handle unexpected payload format

firest 1 ano atrás
pai
commit
5292331b9a

+ 46 - 34
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -705,9 +705,17 @@ get_payload(Payload) ->
     Payload.
 
 parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
-    ParsedPayload;
+    {ok, ParsedPayload};
 parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) ->
-    emqx_utils_json:decode(UnparsedPayload).
+    try emqx_utils_json:decode(UnparsedPayload) of
+        Term when is_map(Term) -> {ok, Term};
+        _ ->
+            %% a plain text will be returned as it is, but here we hope it is a map
+            {error, invalid_data}
+    catch
+        _:_ ->
+            {error, invalid_data}
+    end.
 
 iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
     Timestamp;
@@ -901,38 +909,42 @@ do_render_record([{_, Msg} | Msgs], Channel, Acc) ->
     end.
 
 render_channel_record(#{data := DataTemplate} = Channel, Msg) ->
-    Payload = parse_payload(get_payload(Msg)),
-    case device_id(Msg, Payload, Channel) of
-        undefined ->
-            {error, device_id_missing};
-        DeviceId ->
-            #{timestamp := TimestampTkn} = hd(DataTemplate),
-            NowNS = erlang:system_time(nanosecond),
-            Nows = #{
-                now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond),
-                now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
-                now_ns => NowNS
-            },
-            case
-                proc_record_data(
-                    DataTemplate,
-                    Msg,
-                    [],
-                    [],
-                    []
-                )
-            of
-                {ok, MeasurementAcc, TypeAcc, ValueAcc} ->
-                    {ok, #{
-                        timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
-                        measurements => MeasurementAcc,
-                        data_types => TypeAcc,
-                        values => ValueAcc,
-                        device_id => DeviceId
-                    }};
-                Error ->
-                    Error
-            end
+    case parse_payload(get_payload(Msg)) of
+        {ok, Payload} ->
+            case device_id(Msg, Payload, Channel) of
+                undefined ->
+                    {error, device_id_missing};
+                DeviceId ->
+                    #{timestamp := TimestampTkn} = hd(DataTemplate),
+                    NowNS = erlang:system_time(nanosecond),
+                    Nows = #{
+                        now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond),
+                        now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
+                        now_ns => NowNS
+                    },
+                    case
+                        proc_record_data(
+                            DataTemplate,
+                            Msg,
+                            [],
+                            [],
+                            []
+                        )
+                    of
+                        {ok, MeasurementAcc, TypeAcc, ValueAcc} ->
+                            {ok, #{
+                                timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
+                                measurements => MeasurementAcc,
+                                data_types => TypeAcc,
+                                values => ValueAcc,
+                                device_id => DeviceId
+                            }};
+                        Error ->
+                            Error
+                    end
+            end;
+        Error ->
+            Error
     end.
 
 proc_record_data(

+ 17 - 0
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -644,6 +644,23 @@ t_sync_query_with_lowercase(Config) ->
         emqx_utils_json:decode(IoTDBResult)
     ).
 
+t_sync_query_plain_text(Config) ->
+    IsInvalidType = fun(Result) -> ?assertMatch({error, invalid_data}, Result) end,
+
+    Payload1 = <<"this is a text">>,
+    MakeMessageFun1 = make_message_fun(iotdb_topic(Config), Payload1),
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun1, IsInvalidType, iotdb_bridge_on_query
+    ).
+
+t_sync_query_invalid_json(Config) ->
+    IsInvalidType = fun(Result) -> ?assertMatch({error, invalid_data}, Result) end,
+    Payload2 = <<"{\"msg\":}">>,
+    MakeMessageFun2 = make_message_fun(iotdb_topic(Config), Payload2),
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun2, IsInvalidType, iotdb_bridge_on_query
+    ).
+
 is_empty(null) -> true;
 is_empty([]) -> true;
 is_empty([[]]) -> true;