Pārlūkot izejas kodu

Merge pull request #11223 from JimMoen/fix-influxdb-float-value-trans

fix: influxdb float serialization error
JianBo He 2 gadi atpakaļ
vecāks
revīzija
ae59a26659

+ 4 - 2
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl

@@ -638,8 +638,10 @@ value_type([UInt, <<"u">>]) when
     is_integer(UInt)
 ->
     {uint, UInt};
-value_type([Float]) when is_float(Float) ->
-    Float;
+%% write `1`, `1.0`, `-1.0` all as float
+%% see also: https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float
+value_type([Number]) when is_number(Number) ->
+    Number;
 value_type([<<"t">>]) ->
     't';
 value_type([<<"T">>]) ->

+ 63 - 14
apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl

@@ -454,24 +454,26 @@ query_by_clientid(ClientId, Config) ->
     {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
     DecodedCSV1 = [
         [Field || Field <- Line, Field =/= <<>>]
-     || Line <- DecodedCSV0,
-        Line =/= [<<>>]
+     || Line <- DecodedCSV0, Line =/= [<<>>]
     ],
-    DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []),
+    DecodedCSV2 = csv_lines_to_maps(DecodedCSV1),
     index_by_field(DecodedCSV2).
 
-decode_csv(RawBody) ->
-    Lines =
-        [
-            binary:split(Line, [<<";">>], [global, trim_all])
-         || Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all])
-        ],
-    csv_lines_to_maps(Lines, []).
+csv_lines_to_maps([Title | Rest]) ->
+    csv_lines_to_maps(Rest, Title, _Acc = []);
+csv_lines_to_maps([]) ->
+    [].
 
-csv_lines_to_maps([Fields, Data | Rest], Acc) ->
-    Map = maps:from_list(lists:zip(Fields, Data)),
-    csv_lines_to_maps(Rest, [Map | Acc]);
-csv_lines_to_maps(_Data, Acc) ->
+csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) ->
+    Map = maps:from_list(lists:zip(Title, Data)),
+    csv_lines_to_maps(RestData, Title, [Map | Acc]);
+%% ignore the csv title line
+%% it's always like this:
+%% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>,
+%% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement],
+csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) ->
+    csv_lines_to_maps(RestData, Title, Acc);
+csv_lines_to_maps([], _Title, Acc) ->
     lists:reverse(Acc).
 
 index_by_field(DecodedCSV) ->
@@ -768,6 +770,53 @@ t_boolean_variants(Config) ->
     ),
     ok.
 
+t_any_num_as_float(Config) ->
+    QueryMode = ?config(query_mode, Config),
+    Const = erlang:system_time(nanosecond),
+    ConstBin = integer_to_binary(Const),
+    TsStr = iolist_to_binary(
+        calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
+    ),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(
+            Config,
+            #{
+                <<"write_syntax">> =>
+                    <<"mqtt,clientid=${clientid}", " ",
+                        "float_no_dp=${payload.float_no_dp},float_dp=${payload.float_dp},bar=5i ",
+                        ConstBin/binary>>
+            }
+        )
+    ),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Payload = #{
+        %% no decimal point
+        float_no_dp => 123,
+        %% with decimal point
+        float_dp => 123.0
+    },
+    SentData = #{
+        <<"clientid">> => ClientId,
+        <<"topic">> => atom_to_binary(?FUNCTION_NAME),
+        <<"payload">> => Payload,
+        <<"timestamp">> => erlang:system_time(millisecond)
+    },
+    case QueryMode of
+        sync ->
+            ?assertMatch({ok, 204, _}, send_message(Config, SentData)),
+            ok;
+        async ->
+            ?assertMatch(ok, send_message(Config, SentData)),
+            ct:sleep(500)
+    end,
+    PersistedData = query_by_clientid(ClientId, Config),
+    Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>},
+    assert_persisted_data(ClientId, Expected, PersistedData),
+    TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"float_no_dp">>, PersistedData)),
+    TimeReturned = pad_zero(TimeReturned0),
+    ?assertEqual(TsStr, TimeReturned).
+
 t_bad_timestamp(Config) ->
     InfluxDBType = ?config(influxdb_type, Config),
     InfluxDBName = ?config(influxdb_name, Config),

+ 5 - 0
changes/ee/fix-11223.en.md

@@ -0,0 +1,5 @@
+In InfluxDB bridging, if intend to write using the float data type but the placeholder represents the original value
+as an integer without a decimal point during serialization, it will result in the failure of Influx Line Protocol serialization
+and the inability to write to the InfluxDB bridge.
+
+See also: [InfluxDB v2.7 Line-Protocol](https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float)