Преглед изворни кода

fix: write quoted strings to influxdb failed

Shawn пре 2 година
родитељ
комит
e3fee93d9f

+ 1 - 1
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl

@@ -382,7 +382,7 @@ field(Line) ->
 field_val([$" | Line]) ->
     {Val, [$" | Line1]} = unescape(?FIELD_VAL_ESC_CHARS, [$"], Line, []),
     %% Quoted val can be empty
-    {Val, strip_l(Line1, ?VAL_SEP)};
+    {{quoted, Val}, strip_l(Line1, ?VAL_SEP)};
 field_val(Line) ->
     %% Unquoted value should not be un-escaped according to InfluxDB protocol,
     %% as it can only hold float, integer, uinteger or boolean value.

+ 17 - 4
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl

@@ -599,8 +599,17 @@ to_kv_config(KVfields) ->
 
 to_maps_config(K, V, Res) ->
     NK = emqx_placeholder:preproc_tmpl(bin(K)),
-    NV = emqx_placeholder:preproc_tmpl(bin(V)),
-    Res#{NK => NV}.
+    Res#{NK => preproc_quoted(V)}.
+
+preproc_quoted({quoted, V}) ->
+    {quoted, emqx_placeholder:preproc_tmpl(bin(V))};
+preproc_quoted(V) ->
+    emqx_placeholder:preproc_tmpl(bin(V)).
+
+proc_quoted({quoted, V}, Data, TransOpts) ->
+    {quoted, emqx_placeholder:proc_tmpl(V, Data, TransOpts)};
+proc_quoted(V, Data, TransOpts) ->
+    emqx_placeholder:proc_tmpl(V, Data, TransOpts).
 
 %% -------------------------------------------------------------------------------------------------
 %% Tags & Fields Data Trans
@@ -722,19 +731,23 @@ maps_config_to_data(K, V, {Data, Res}) ->
     KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
     VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
     NK = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
-    NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
+    NV = proc_quoted(V, Data, VTransOptions),
     case {NK, NV} of
         {[undefined], _} ->
             {Data, Res};
         %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
         {_, [undefined | _]} ->
             {Data, Res};
+        {_, {quoted, [undefined | _]}} ->
+            {Data, Res};
         _ ->
             {Data, Res#{
                 list_to_binary(NK) => value_type(NV, tmpl_type(V))
             }}
     end.
 
+value_type({quoted, ValList}, _) ->
+    {string_list, ValList};
 value_type([Int, <<"i">>], mixed) when is_integer(Int) ->
     {int, Int};
 value_type([UInt, <<"u">>], mixed) when is_integer(UInt) ->
@@ -778,7 +791,7 @@ value_type([Str], literal) when is_binary(Str) ->
             maybe_convert_to_float_str(Str)
     end;
 value_type(Str, _) ->
-    list_to_binary(Str).
+    Str.
 
 tmpl_type([{str, _}]) ->
     literal;

+ 46 - 10
apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl

@@ -445,6 +445,7 @@ query_by_clientid(ClientId, Config) ->
             query => Query,
             dialect => #{
                 header => true,
+                annotations => [<<"datatype">>],
                 delimiter => <<";">>
             }
         }),
@@ -456,6 +457,7 @@ query_by_clientid(ClientId, Config) ->
             _Timeout = 10_000,
             _Retry = 0
         ),
+    %ct:pal("raw body: ~p", [RawBody0]),
     RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)),
     {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
     DecodedCSV1 = [
@@ -465,21 +467,26 @@ query_by_clientid(ClientId, Config) ->
     DecodedCSV2 = csv_lines_to_maps(DecodedCSV1),
     index_by_field(DecodedCSV2).
 
-csv_lines_to_maps([Title | Rest]) ->
-    csv_lines_to_maps(Rest, Title, _Acc = []);
+csv_lines_to_maps([[<<"#datatype">> | DataType], Title | Rest]) ->
+    csv_lines_to_maps(Rest, Title, _Acc = [], DataType);
 csv_lines_to_maps([]) ->
     [].
 
-csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc) ->
+csv_lines_to_maps([[<<"_result">> | _] = Data | RestData], Title, Acc, DataType) ->
+    %ct:pal("data: ~p, title: ~p, datatype: ~p", [Data, Title, DataType]),
     Map = maps:from_list(lists:zip(Title, Data)),
-    csv_lines_to_maps(RestData, Title, [Map | Acc]);
+    MapT = lists:zip(Title, DataType),
+    [Type] = [T || {<<"_value">>, T} <- MapT],
+    csv_lines_to_maps(RestData, Title, [Map#{'_value_type' => Type} | Acc], DataType);
 %% ignore the csv title line
 %% it's always like this:
 %% [<<"result">>,<<"table">>,<<"_start">>,<<"_stop">>,
 %% <<"_time">>,<<"_value">>,<<"_field">>,<<"_measurement">>, Measurement],
-csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc) ->
-    csv_lines_to_maps(RestData, Title, Acc);
-csv_lines_to_maps([], _Title, Acc) ->
+csv_lines_to_maps([[<<"result">> | _] = _Title | RestData], Title, Acc, DataType) ->
+    csv_lines_to_maps(RestData, Title, Acc, DataType);
+csv_lines_to_maps([[<<"#datatype">> | DataType] | RestData], Title, Acc, _) ->
+    csv_lines_to_maps(RestData, Title, Acc, DataType);
+csv_lines_to_maps([], _Title, Acc, _DataType) ->
     lists:reverse(Acc).
 
 index_by_field(DecodedCSV) ->
@@ -494,11 +501,21 @@ assert_persisted_data(ClientId, Expected, PersistedData) ->
                     #{<<"_value">> := ExpectedValue},
                     maps:get(ClientIdIntKey, PersistedData)
                 );
+            (Key, {ExpectedValue, ExpectedType}) ->
+                ?assertMatch(
+                    #{<<"_value">> := ExpectedValue, '_value_type' := ExpectedType},
+                    maps:get(atom_to_binary(Key), PersistedData),
+                    #{
+                        key => Key,
+                        expected_value => ExpectedValue,
+                        expected_data_type => ExpectedType
+                    }
+                );
             (Key, ExpectedValue) ->
                 ?assertMatch(
                     #{<<"_value">> := ExpectedValue},
                     maps:get(atom_to_binary(Key), PersistedData),
-                    #{expected => ExpectedValue}
+                    #{key => Key, expected_value => ExpectedValue}
                 )
         end,
         Expected
@@ -689,7 +706,15 @@ t_const_timestamp(Config) ->
             Config,
             #{
                 <<"write_syntax">> =>
-                    <<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>>
+                    <<
+                        "mqtt,clientid=${clientid} "
+                        "foo=${payload.foo}i,"
+                        "foo1=${payload.foo},"
+                        "foo2=\"${payload.foo}\","
+                        "foo3=\"${payload.foo}somestr\","
+                        "bar=5i,baz0=1.1,baz1=\"a\",baz2=\"ai\",baz3=\"au\",baz4=\"1u\" ",
+                        ConstBin/binary
+                    >>
             }
         )
     ),
@@ -709,7 +734,18 @@ t_const_timestamp(Config) ->
     end,
     ct:sleep(1500),
     PersistedData = query_by_clientid(ClientId, Config),
-    Expected = #{foo => <<"123">>},
+    Expected = #{
+        foo => {<<"123">>, <<"long">>},
+        foo1 => {<<"123">>, <<"double">>},
+        foo2 => {<<"123">>, <<"string">>},
+        foo3 => {<<"123somestr">>, <<"string">>},
+        bar => {<<"5">>, <<"long">>},
+        baz0 => {<<"1.1">>, <<"double">>},
+        baz1 => {<<"a">>, <<"string">>},
+        baz2 => {<<"ai">>, <<"string">>},
+        baz3 => {<<"au">>, <<"string">>},
+        baz4 => {<<"1u">>, <<"string">>}
+    },
     assert_persisted_data(ClientId, Expected, PersistedData),
     TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)),
     TimeReturned = pad_zero(TimeReturned0),

+ 60 - 16
apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_tests.erl

@@ -102,27 +102,51 @@
         #{
             measurement => "m7",
             tags => [{"tag", "tag7"}, {"tag_a", "\"tag7a\""}, {"tag_b", "tag7b"}],
-            fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b"}],
+            fields => [
+                {"field", {quoted, "field7"}},
+                {"field_a", "field7a"},
+                {"field_b", {quoted, "field7b"}}
+            ],
             timestamp => undefined
         }},
     {"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"field8b\" ${timestamp8}",
         #{
             measurement => "m8",
             tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
-            fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "field8b"}],
+            fields => [
+                {"field", {quoted, "field8"}},
+                {"field_a", "field8a"},
+                {"field_b", {quoted, "field8b"}}
+            ],
             timestamp => "${timestamp8}"
         }},
+    {
+        "m8a,tag=tag8,tag_a=\"${tag8a}\",tag_b=tag8b field=\"${field8}\","
+        "field_a=field8a,field_b=\"${field8b}\" ${timestamp8}",
+        #{
+            measurement => "m8a",
+            tags => [{"tag", "tag8"}, {"tag_a", "\"${tag8a}\""}, {"tag_b", "tag8b"}],
+            fields => [
+                {"field", {quoted, "${field8}"}},
+                {"field_a", "field8a"},
+                {"field_b", {quoted, "${field8b}"}}
+            ],
+            timestamp => "${timestamp8}"
+        }
+    },
     {"m9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
         #{
             measurement => "m9",
             tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
-            fields => [{"field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
+            fields => [
+                {"field", {quoted, "field9"}}, {"field_a", "field9a"}, {"field_b", {quoted, ""}}
+            ],
             timestamp => "${timestamp9}"
         }},
     {"m10 field=\"\" ${timestamp10}", #{
         measurement => "m10",
         tags => [],
-        fields => [{"field", ""}],
+        fields => [{"field", {quoted, ""}}],
         timestamp => "${timestamp10}"
     }}
 ]).
@@ -177,19 +201,19 @@
     {"m2,tag=tag2 field=\"field \\\"2\\\",\n\"", #{
         measurement => "m2",
         tags => [{"tag", "tag2"}],
-        fields => [{"field", "field \"2\",\n"}],
+        fields => [{"field", {quoted, "field \"2\",\n"}}],
         timestamp => undefined
     }},
     {"m\\ 3 field=\"field3\" ${payload.timestamp\\ 3}", #{
         measurement => "m 3",
         tags => [],
-        fields => [{"field", "field3"}],
+        fields => [{"field", {quoted, "field3"}}],
         timestamp => "${payload.timestamp 3}"
     }},
     {"m4 field=\"\\\"field\\\\4\\\"\"", #{
         measurement => "m4",
         tags => [],
-        fields => [{"field", "\"field\\4\""}],
+        fields => [{"field", {quoted, "\"field\\4\""}}],
         timestamp => undefined
     }},
     {
@@ -208,7 +232,11 @@
         #{
             measurement => "m6",
             tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
-            fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+            fields => [
+                {"field", {quoted, "field6"}},
+                {"field_a", {quoted, "field6a"}},
+                {"field_b", {quoted, "field6b"}}
+            ],
             timestamp => undefined
         }},
     {
@@ -217,7 +245,11 @@
         #{
             measurement => "  m7  ",
             tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
-            fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
+            fields => [
+                {"field", {quoted, "field7"}},
+                {"field_a", "field7a"},
+                {"field_b", {quoted, "field7b\\\n"}}
+            ],
             timestamp => undefined
         }
     },
@@ -227,7 +259,11 @@
         #{
             measurement => "m8",
             tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
-            fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
+            fields => [
+                {"field", {quoted, "field8"}},
+                {"field_a", "field8a"},
+                {"field_b", {quoted, "\"field\" = 8b"}}
+            ],
             timestamp => "${timestamp8}"
         }
     },
@@ -235,14 +271,18 @@
         #{
             measurement => "m\\9",
             tags => [{"tag", "tag9"}, {"tag_a", "\"tag9a\""}, {"tag_b", "tag9b"}],
-            fields => [{"field=field", "field9"}, {"field_a", "field9a"}, {"field_b", ""}],
+            fields => [
+                {"field=field", {quoted, "field9"}},
+                {"field_a", "field9a"},
+                {"field_b", {quoted, ""}}
+            ],
             timestamp => "${timestamp9}"
         }},
     {"m\\,10 \"field\\\\\"=\"\" ${timestamp10}", #{
         measurement => "m,10",
         tags => [],
         %% backslash should not be un-escaped in tag key
-        fields => [{"\"field\\\\\"", ""}],
+        fields => [{"\"field\\\\\"", {quoted, ""}}],
         timestamp => "${timestamp10}"
     }}
 ]).
@@ -257,19 +297,19 @@
     {" m2,tag=tag2   field=\"field \\\"2\\\",\n\"  ", #{
         measurement => "m2",
         tags => [{"tag", "tag2"}],
-        fields => [{"field", "field \"2\",\n"}],
+        fields => [{"field", {quoted, "field \"2\",\n"}}],
         timestamp => undefined
     }},
     {"  m\\ 3   field=\"field3\"   ${payload.timestamp\\ 3}  ", #{
         measurement => "m 3",
         tags => [],
-        fields => [{"field", "field3"}],
+        fields => [{"field", {quoted, "field3"}}],
         timestamp => "${payload.timestamp 3}"
     }},
     {"   m4       field=\"\\\"field\\\\4\\\"\"    ", #{
         measurement => "m4",
         tags => [],
-        fields => [{"field", "\"field\\4\""}],
+        fields => [{"field", {quoted, "\"field\\4\""}}],
         timestamp => undefined
     }},
     {
@@ -288,7 +328,11 @@
         #{
             measurement => "m6",
             tags => [{"tag", "tag6"}, {"tag_a", "tag6a"}, {"tag_b", "tag6b"}],
-            fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
+            fields => [
+                {"field", {quoted, "field6"}},
+                {"field_a", {quoted, "field6a"}},
+                {"field_b", {quoted, "field6b"}}
+            ],
             timestamp => undefined
         }}
 ]).