Просмотр исходного кода

Merge pull request #11266 from lafirest/fix/influxdb-ph

fix(tdengine): fix SQL template errors
lafirest 2 лет назад
Родитель
Сommit
f6834b33a0

+ 8 - 1
.github/workflows/run_jmeter_tests.yaml

@@ -17,8 +17,15 @@ jobs:
     - uses: erlef/setup-beam@v1.15.4
       with:
         otp-version: 25.3.2
+    - name: Cache Jmeter
+      id: cache-jmeter
+      uses: actions/cache@v3
+      with:
+        path: /tmp/apache-jmeter.tgz
+        key: apache-jmeter-5.4.3.tgz
     - name: download jmeter
-      timeout-minutes: 3
+      if: steps.cache-jmeter.outputs.cache-hit != 'true'
+      timeout-minutes: 15
       env:
           JMETER_VERSION: 5.4.3
       run: |

+ 11 - 27
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -124,7 +124,7 @@ on_query(InstanceId, {query, SQL}, State) ->
 on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
     case maps:find(Key, InsertTksMap) of
         {ok, Tokens} when is_map(Data) ->
-            SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data),
+            SQL = emqx_placeholder:proc_tmpl(Tokens, Data),
             do_query(InstanceId, SQL, State);
         _ ->
             {error, {unrecoverable_error, invalid_request}}
@@ -209,31 +209,16 @@ execute(Conn, Query, Opts) ->
     tdengine:insert(Conn, Query, Opts).
 
 do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
-    Queries = aggregate_query(Tokens, BatchReqs),
-    SQL = maps:fold(
-        fun(InsertPart, Values, Acc) ->
-            lists:foldl(
-                fun(ValuePart, IAcc) ->
-                    <<IAcc/binary, " ", ValuePart/binary>>
-                end,
-                <<Acc/binary, " ", InsertPart/binary, " VALUES">>,
-                Values
-            )
-        end,
-        <<"INSERT INTO">>,
-        Queries
-    ),
+    SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
     execute(Conn, SQL, Opts).
 
-aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
+aggregate_query(BatchTks, BatchReqs, Acc) ->
     lists:foldl(
-        fun({_, Data}, Acc) ->
-            InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data),
-            ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data),
-            Values = maps:get(InsertPart, Acc, []),
-            maps:put(InsertPart, [ParamsPart | Values], Acc)
+        fun({_, Data}, InAcc) ->
+            InsertPart = emqx_placeholder:proc_tmpl(BatchTks, Data),
+            <<InAcc/binary, " ", InsertPart/binary>>
         end,
-        #{},
+        Acc,
         BatchReqs
     ).
 
@@ -260,13 +245,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
             InsertTks = emqx_placeholder:preproc_tmpl(H),
             H1 = string:trim(H, trailing, ";"),
             case split_insert_sql(H1) of
-                [_InsertStr, InsertPart, _ValuesStr, ParamsPart] ->
-                    InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart),
-                    ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart),
+                [_InsertPart, BatchDesc] ->
+                    BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc),
                     parse_batch_prepare_sql(
                         T,
                         InsertTksMap#{Key => InsertTks},
-                        BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}}
+                        BatchTksMap#{Key => BatchTks}
                     );
                 Result ->
                     ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
@@ -299,7 +283,7 @@ split_insert_sql(SQL0) ->
                     {true, E1}
             end
         end,
-        re:split(SQL, "(?i)(insert into)|(?i)(values)")
+        re:split(SQL, "(?i)(insert into)")
     ).
 
 formalize_sql(Input) ->

+ 86 - 65
apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl

@@ -13,7 +13,8 @@
 
 % SQL definitions
 -define(SQL_BRIDGE,
-    "insert into mqtt.t_mqtt_msg(ts, payload) values (${timestamp}, ${payload})"
+    "insert into t_mqtt_msg(ts, payload) values (${timestamp}, '${payload}')"
+    "t_mqtt_msg(ts, payload) values (${second_ts}, '${payload}')"
 ).
 
 -define(SQL_CREATE_DATABASE, "CREATE DATABASE IF NOT EXISTS mqtt; USE mqtt;").
@@ -29,7 +30,8 @@
 -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg").
 
 -define(AUTO_CREATE_BRIDGE,
-    "insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})"
+    "insert into ${clientid} USING s_tab TAGS ('${clientid}') values (${timestamp}, '${payload}')"
+    "test_${clientid} USING s_tab TAGS ('${clientid}') values (${second_ts}, '${payload}')"
 ).
 
 -define(SQL_CREATE_STABLE,
@@ -301,7 +303,7 @@ connect_and_clear_table(Config) ->
 
 connect_and_get_payload(Config) ->
     ?WITH_CON(
-        {ok, #{<<"code">> := 0, <<"data">> := [[Result]]}} = directly_query(Con, ?SQL_SELECT)
+        {ok, #{<<"code">> := 0, <<"data">> := Result}} = directly_query(Con, ?SQL_SELECT)
     ),
     Result.
 
@@ -329,7 +331,7 @@ t_setup_via_config_and_publish(Config) ->
         {ok, _},
         create_bridge(Config)
     ),
-    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
+    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
     ?check_trace(
         begin
             {_, {ok, #{result := Result}}} =
@@ -342,7 +344,7 @@ t_setup_via_config_and_publish(Config) ->
                 {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result
             ),
             ?assertMatch(
-                ?PAYLOAD,
+                [[?PAYLOAD], [?PAYLOAD]],
                 connect_and_get_payload(Config)
             ),
             ok
@@ -368,7 +370,8 @@ t_setup_via_http_api_and_publish(Config) ->
         {ok, _},
         create_bridge_http(TDengineConfig)
     ),
-    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
+
+    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
     ?check_trace(
         begin
             Request = {send_message, SentData},
@@ -386,7 +389,7 @@ t_setup_via_http_api_and_publish(Config) ->
                 {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0
             ),
             ?assertMatch(
-                ?PAYLOAD,
+                [[?PAYLOAD], [?PAYLOAD]],
                 connect_and_get_payload(Config)
             ),
             ok
@@ -426,7 +429,7 @@ t_write_failure(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     {ok, _} = create_bridge(Config),
-    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
+    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
     emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         {_, {ok, #{result := Result}}} =
             ?wait_async_action(
@@ -461,7 +464,7 @@ t_write_timeout(Config) ->
             }
         }
     ),
-    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
+    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
     %% FIXME: TDengine connector hangs indefinetily during
     %% `call_query' while the connection is unresponsive.  Should add
     %% a timeout to `APPLY_RESOURCE' in buffer worker??
@@ -486,7 +489,7 @@ t_simple_sql_query(Config) ->
         {ok, _},
         create_bridge(Config)
     ),
-    Request = {query, <<"SELECT count(1) AS T">>},
+    Request = {query, <<"SELECT 1 AS T">>},
     {_, {ok, #{result := Result}}} =
         ?wait_async_action(
             query_resource(Config, Request),
@@ -537,37 +540,41 @@ t_bad_sql_parameter(Config) ->
     ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result),
     ok.
 
-t_nasty_sql_string(Config) ->
-    ?assertMatch(
-        {ok, _},
-        create_bridge(Config)
-    ),
-    % NOTE
-    % Column `payload` has BINARY type, so we would certainly like to test it
-    % with `lists:seq(1, 127)`, but:
-    % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's
-    %    parser[1] has no escaping sequence for it so a zero byte probably confuses
-    %    interpreter somewhere down the line.
-    % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for
-    %    some reason.
-    %
-    % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301
-    Payload = list_to_binary(lists:seq(1, 127)),
-    Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
-    {_, {ok, #{result := Result}}} =
-        ?wait_async_action(
-            send_message(Config, Message),
-            #{?snk_kind := buffer_worker_flush_ack},
-            2_000
-        ),
-    ?assertMatch(
-        {ok, #{<<"code">> := 0, <<"rows">> := 1}},
-        Result
-    ),
-    ?assertEqual(
-        Payload,
-        connect_and_get_payload(Config)
-    ).
+%% TODO
+%% For supporting to generate a subtable name by mixing prefixes/suffixes with placeholders,
+%% the SQL quote(escape) is removed now,
+%% we should introduce a new syntax for placeholders to allow some vars to keep unquote.
+%% t_nasty_sql_string(Config) ->
+%%     ?assertMatch(
+%%         {ok, _},
+%%         create_bridge(Config)
+%%     ),
+%%     % NOTE
+%%     % Column `payload` has BINARY type, so we would certainly like to test it
+%%     % with `lists:seq(1, 127)`, but:
+%%     % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's
+%%     %    parser[1] has no escaping sequence for it so a zero byte probably confuses
+%%     %    interpreter somewhere down the line.
+%%     % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for
+%%     %    some reason.
+%%     %
+%%     % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301
+%%     Payload = list_to_binary(lists:seq(1, 127)),
+%%     Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
+%%     {_, {ok, #{result := Result}}} =
+%%         ?wait_async_action(
+%%             send_message(Config, Message),
+%%             #{?snk_kind := buffer_worker_flush_ack},
+%%             2_000
+%%         ),
+%%     ?assertMatch(
+%%         {ok, #{<<"code">> := 0, <<"rows">> := 1}},
+%%         Result
+%%     ),
+%%     ?assertEqual(
+%%         Payload,
+%%         connect_and_get_payload(Config)
+%%     ).
 
 t_simple_insert(Config) ->
     connect_and_clear_table(Config),
@@ -576,7 +583,7 @@ t_simple_insert(Config) ->
         create_bridge(Config)
     ),
 
-    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
+    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
     Request = {send_message, SentData},
     {_, {ok, #{result := _Result}}} =
         ?wait_async_action(
@@ -585,7 +592,7 @@ t_simple_insert(Config) ->
             2_000
         ),
     ?assertMatch(
-        ?PAYLOAD,
+        [[?PAYLOAD], [?PAYLOAD]],
         connect_and_get_payload(Config)
     ).
 
@@ -602,7 +609,9 @@ t_batch_insert(Config) ->
         ?wait_async_action(
             lists:foreach(
                 fun(Idx) ->
-                    SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx},
+                    SentData = #{
+                        payload => ?PAYLOAD, timestamp => Ts + Idx, second_ts => Ts + Idx + 5000
+                    },
                     Request = {send_message, SentData},
                     query_resource(Config, Request)
                 end,
@@ -613,11 +622,12 @@ t_batch_insert(Config) ->
             2_000
         ),
 
+    DoubleSize = Size * 2,
     ?retry(
         _Sleep = 50,
         _Attempts = 30,
         ?assertMatch(
-            [[Size]],
+            [[DoubleSize]],
             connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg")
         )
     ).
@@ -633,6 +643,7 @@ t_auto_create_simple_insert(Config0) ->
     SentData = #{
         payload => ?PAYLOAD,
         timestamp => 1668602148000,
+        second_ts => 1668602148000 + 100,
         clientid => ClientId
     },
     Request = {send_message, SentData},
@@ -647,9 +658,19 @@ t_auto_create_simple_insert(Config0) ->
         connect_and_query(Config, "SELECT payload FROM " ++ ClientId)
     ),
 
+    ?assertMatch(
+        [[?PAYLOAD]],
+        connect_and_query(Config, "SELECT payload FROM test_" ++ ClientId)
+    ),
+
     ?assertMatch(
         [[0]],
         connect_and_query(Config, "DROP TABLE " ++ ClientId)
+    ),
+
+    ?assertMatch(
+        [[0]],
+        connect_and_query(Config, "DROP TABLE test_" ++ ClientId)
     ).
 
 t_auto_create_batch_insert(Config0) ->
@@ -675,6 +696,7 @@ t_auto_create_batch_insert(Config0) ->
                             SentData = #{
                                 payload => ?PAYLOAD,
                                 timestamp => Ts + Idx + Offset,
+                                second_ts => Ts + Idx + Offset + 5000,
                                 clientid => ClientId
                             },
                             Request = {send_message, SentData},
@@ -693,29 +715,28 @@ t_auto_create_batch_insert(Config0) ->
         _Sleep = 50,
         _Attempts = 30,
 
-        ?assertMatch(
-            [[Size1]],
-            connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1)
-        )
-    ),
-
-    ?retry(
-        50,
-        30,
-        ?assertMatch(
-            [[Size2]],
-            connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2)
+        lists:foreach(
+            fun({Table, Size}) ->
+                ?assertMatch(
+                    [[Size]],
+                    connect_and_query(Config, "SELECT COUNT(1) FROM " ++ Table)
+                )
+            end,
+            lists:zip(
+                [ClientId1, "test_" ++ ClientId1, ClientId2, "test_" ++ ClientId2],
+                [Size1, Size1, Size2, Size2]
+            )
         )
     ),
 
-    ?assertMatch(
-        [[0]],
-        connect_and_query(Config, "DROP TABLE " ++ ClientId1)
-    ),
-
-    ?assertMatch(
-        [[0]],
-        connect_and_query(Config, "DROP TABLE " ++ ClientId2)
+    lists:foreach(
+        fun(E) ->
+            ?assertMatch(
+                [[0]],
+                connect_and_query(Config, "DROP TABLE " ++ E)
+            )
+        end,
+        [ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2]
     ).
 
 to_bin(List) when is_list(List) ->

+ 20 - 0
changes/ee/fix-11266.en.md

@@ -0,0 +1,20 @@
+Fix and improve support for TDEngine `insert` syntax.
+
+1. Support inserting into multi-table in the template
+
+   For example:
+
+   `insert into table_1 values (${ts}, '${id}', '${topic}')
+   table_2 values (${ts}, '${id}', '${topic}')`
+
+2. Support mixing prefixes/suffixes and placeholders in the template
+
+   For example:
+
+   `insert into table_${topic} values (${ts}, '${id}', '${topic}')`
+
+Note: This is a breaking change, at the former, the string-type values are automatically quoted, bu now, you should manually quote them.
+
+For example:
+
+`insert into table values (${ts}, '${a_string}')`