|
|
@@ -454,24 +454,26 @@ query_by_clientid(ClientId, Config) ->
|
|
|
{ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
|
|
|
DecodedCSV1 = [
|
|
|
[Field || Field <- Line, Field =/= <<>>]
|
|
|
- || Line <- DecodedCSV0,
|
|
|
- Line =/= [<<>>]
|
|
|
+ || Line <- DecodedCSV0, Line =/= [<<>>]
|
|
|
],
|
|
|
- DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []),
|
|
|
+ DecodedCSV2 = csv_lines_to_maps(DecodedCSV1),
|
|
|
index_by_field(DecodedCSV2).
|
|
|
|
|
|
-decode_csv(RawBody) ->
|
|
|
- Lines =
|
|
|
- [
|
|
|
- binary:split(Line, [<<";">>], [global, trim_all])
|
|
|
- || Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all])
|
|
|
- ],
|
|
|
- csv_lines_to_maps(Lines, []).
|
|
|
+csv_lines_to_maps([Title | Rest]) ->
|
|
|
+ csv_lines_to_maps(Rest, Title, _Acc = []);
|
|
|
+csv_lines_to_maps([]) ->
|
|
|
+ [].
|
|
|
|
|
|
-csv_lines_to_maps([Fields, Data | Rest], Acc) ->
|
|
|
- Map = maps:from_list(lists:zip(Fields, Data)),
|
|
|
- csv_lines_to_maps(Rest, [Map | Acc]);
|
|
|
-csv_lines_to_maps(_Data, Acc) ->
|
|
|
+csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) ->
|
|
|
+ Map = maps:from_list(lists:zip(Title, Data)),
|
|
|
+ csv_lines_to_maps(RestData, Title, [Map | Acc]);
|
|
|
+%% ignore the csv title line
|
|
|
+%% it's always like this:
|
|
|
+%% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>,
|
|
|
+%% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement],
|
|
|
+csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) ->
|
|
|
+ csv_lines_to_maps(RestData, Title, Acc);
|
|
|
+csv_lines_to_maps([], _Title, Acc) ->
|
|
|
lists:reverse(Acc).
|
|
|
|
|
|
index_by_field(DecodedCSV) ->
|
|
|
@@ -768,6 +770,53 @@ t_boolean_variants(Config) ->
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
+t_any_num_as_float(Config) ->
|
|
|
+ QueryMode = ?config(query_mode, Config),
|
|
|
+ Const = erlang:system_time(nanosecond),
|
|
|
+ ConstBin = integer_to_binary(Const),
|
|
|
+ TsStr = iolist_to_binary(
|
|
|
+ calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, _},
|
|
|
+ create_bridge(
|
|
|
+ Config,
|
|
|
+ #{
|
|
|
+ <<"write_syntax">> =>
|
|
|
+ <<"mqtt,clientid=${clientid}", " ",
|
|
|
+ "float_no_dp=${payload.float_no_dp},float_dp=${payload.float_dp},bar=5i ",
|
|
|
+ ConstBin/binary>>
|
|
|
+ }
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
+ Payload = #{
|
|
|
+ %% no decimal point
|
|
|
+ float_no_dp => 123,
|
|
|
+ %% with decimal point
|
|
|
+ float_dp => 123.0
|
|
|
+ },
|
|
|
+ SentData = #{
|
|
|
+ <<"clientid">> => ClientId,
|
|
|
+ <<"topic">> => atom_to_binary(?FUNCTION_NAME),
|
|
|
+ <<"payload">> => Payload,
|
|
|
+ <<"timestamp">> => erlang:system_time(millisecond)
|
|
|
+ },
|
|
|
+ case QueryMode of
|
|
|
+ sync ->
|
|
|
+ ?assertMatch({ok, 204, _}, send_message(Config, SentData)),
|
|
|
+ ok;
|
|
|
+ async ->
|
|
|
+ ?assertMatch(ok, send_message(Config, SentData)),
|
|
|
+ ct:sleep(500)
|
|
|
+ end,
|
|
|
+ PersistedData = query_by_clientid(ClientId, Config),
|
|
|
+ Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>},
|
|
|
+ assert_persisted_data(ClientId, Expected, PersistedData),
|
|
|
+ TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"float_no_dp">>, PersistedData)),
|
|
|
+ TimeReturned = pad_zero(TimeReturned0),
|
|
|
+ ?assertEqual(TsStr, TimeReturned).
|
|
|
+
|
|
|
t_bad_timestamp(Config) ->
|
|
|
InfluxDBType = ?config(influxdb_type, Config),
|
|
|
InfluxDBName = ?config(influxdb_name, Config),
|