Przeglądaj źródła

Merge pull request #14419 from JimMoen/1216-fix-EMQX-13521

fix: clickhouse split sql re
JimMoen 1 rok temu
rodzic
commit
1647216d4b

+ 80 - 19
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -51,6 +51,10 @@
     execute_sql_in_clickhouse_server_using_connection/2
 ]).
 
+-ifdef(TEST).
+-export([split_clickhouse_insert_sql/1]).
+-endif.
+
 %%=====================================================================
 %% Types
 %%=====================================================================
@@ -76,6 +80,75 @@
 
 -type clickhouse_config() :: map().
 
+%%=====================================================================
+%% Macros and On load
+%%=====================================================================
+
+%% Copied from emqx_utils_sql:parse_insert/1
+%% Can also handle Clickhouse's SQL extension for INSERT statments that allows the
+%% user to specify different formats:
+%%
+%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
+%%
+-define(INSERT_RE_MP_KEY, {?MODULE, insert_re_mp}).
+-define(INSERT_RE_BIN, <<
+    %% case-insensitive
+    "(?i)^",
+    %% Leading spaces
+    "\\s*",
+    %% Group-1: insert into, table name and columns (when existed).
+    %% All space characters suffixed to <TABLE_NAME> will be kept
+    %% `INSERT INTO <TABLE_NAME> [(<COLUMN>, ..)]`
+    "(insert\\s+into\\s+[^\\s\\(\\)]+\\s*(?:(?:\\((?:[^()]++|(?2))*\\)\\s*,?\\s*)*))",
+    "\\s*",
+    %% Ignore Group
+    "(?:",
+    %% Group-2 (Optional for FORMAT clause):
+    %% literals value(s) or placeholder(s) with round brackets.
+    %% And the sub-pattern in brackets does not do any capturing
+    %% Ignore Group:
+    %%     `VALUES [([<VALUE> | <PLACEHOLDER>], ...)]`
+    %% Keep Capturing-Group:
+    %%     `([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ..)]`
+    "(?:values\\s*(\\((?:[^()]++|(?2))*\\)(?:\\s*,\\s*\\((?:[^()]++|(?2)*)\\))*))",
+    %% End Group-2
+    %% or
+    "|",
+    %% Group-3:
+    %% literals value(s) or placeholder(s) as `<FORMAT_DATA>`
+    %% Ignore Group:
+    %%     `FORMAT <FORMAT_NAME> <FORMAT_DATA>`
+    %% Keep Capturing-Group `<FORMAT_DATA>` without any check
+    %%   Could be:
+    %%     `([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ...)]`
+    %%     `[([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ...)]]`
+    %%     ...
+    "(?:format\\s+[a-zA-Z]+\\s+)((?!\\s)(?=.*\\s).*)",
+    %% End Group-3
+    ")",
+    %% End Ignored Group
+    "\\s*$"
+>>).
+
+-on_load(on_load/0).
+
+on_load() ->
+    put_insert_mp(),
+    ok.
+
+put_insert_mp() ->
+    persistent_term:put(?INSERT_RE_MP_KEY, re:compile(?INSERT_RE_BIN)),
+    ok.
+
+get_insert_mp() ->
+    case persistent_term:get(?INSERT_RE_MP_KEY, undefined) of
+        undefined ->
+            ok = put_insert_mp(),
+            get_insert_mp();
+        {ok, MP} ->
+            {ok, MP}
+    end.
+
 %%=====================================================================
 %% Configuration and default values
 %%=====================================================================
@@ -220,33 +293,21 @@ prepare_sql_bulk_extend_template(Template, Separator) ->
     ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
     emqx_placeholder:preproc_tmpl(ExtendParamTemplate).
 
-%% This function is similar to emqx_utils_sql:parse_insert/1 but can
-%% also handle Clickhouse's SQL extension for INSERT statments that allows the
-%% user to specify different formats:
-%%
-%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
-%%
 split_clickhouse_insert_sql(SQL) ->
     ErrorMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>,
-    case
-        re:split(SQL, "(\\s+(?i:values)|(?i:format\\s+(?:[A-Za-z0-9_])+)\\s+)", [{return, binary}])
-    of
-        [Part1, _, Part3] ->
-            case string:trim(Part1, leading) of
-                <<"insert", _/binary>> ->
-                    Part3;
-                <<"INSERT", _/binary>> ->
-                    Part3;
-                _ ->
-                    erlang:error(ErrorMsg)
-            end;
+    {ok, MP} = get_insert_mp(),
+    case re:run(SQL, MP, [{capture, all_but_first, binary}]) of
+        {match, [_InsertInto, ValuesTemplate]} ->
+            ValuesTemplate;
+        %% Group2 is empty (not `VALUES` statement)
+        {match, [_InsertInto, <<>>, FormatTemplate]} ->
+            FormatTemplate;
         _ ->
             erlang:error(ErrorMsg)
     end.
 
 % This is a callback for ecpool which is triggered by the call to
 % emqx_resource_pool:start in on_start/2
-
 connect(Options) ->
     URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
     User = proplists:get_value(user, Options),

+ 123 - 0
apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl

@@ -136,6 +136,9 @@ clickhouse_url() ->
     Port = clickhouse_port(),
     erlang:iolist_to_binary(["http://", Host, ":", Port]).
 
+parse_insert(SQL) ->
+    emqx_bridge_clickhouse_connector:split_clickhouse_insert_sql(SQL).
+
 clickhouse_config(Config) ->
     SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
     BatchSeparator = maps:get(batch_value_separator, Config, <<", ">>),
@@ -245,6 +248,126 @@ t_make_delete_bridge(_Config) ->
     false = lists:any(IsRightName, BridgesAfterDelete),
     ok.
 
+t_parse_insert_sql_template(_Config) ->
+    ?assertEqual(
+        <<"(${tagvalues},${date})"/utf8>>,
+        parse_insert(
+            <<"insert into tag_VALUES(tag_values,Timestamp) values (${tagvalues},${date})"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${id}, 'Иван', 25)"/utf8>>,
+        parse_insert(
+            <<"INSERT INTO Values_таблица (идентификатор, имя, возраст)   VALUES \t (${id}, 'Иван', 25)  "/utf8>>
+        )
+    ),
+
+    %% `values` in column name
+    ?assertEqual(
+        <<"(${tagvalues},${date}  )"/utf8>>,
+        parse_insert(
+            <<"insert into PI.dbo.tags(tag_values,Timestamp) values (${tagvalues},${date}  )"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${payload}, FROM_UNIXTIME((${timestamp}/1000)))">>,
+        parse_insert(
+            <<"INSERT INTO mqtt_test(payload, arrived) VALUES (${payload}, FROM_UNIXTIME((${timestamp}/1000)))"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${id},'Алексей',30)"/utf8>>,
+        parse_insert(
+            <<"insert into таблица (идентификатор,имя,возраст) VALUES(${id},'Алексей',30)"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${id}, '张三', 22)"/utf8>>,
+        parse_insert(
+            <<"INSERT into 表格 (标识, 名字, 年龄) VALUES (${id}, '张三', 22)"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${id},'李四', 35)"/utf8>>,
+        parse_insert(
+            <<"  inSErt into 表格(标识,名字,年龄)values(${id},'李四', 35)"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(   ${tagvalues},   ${date} )"/utf8>>,
+        parse_insert(
+            <<"insert into PI.dbo.tags( tag_value,Timestamp)  VALUES\t\t(   ${tagvalues},   ${date} )"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${tagvalues},${date})"/utf8>>,
+        parse_insert(
+            <<"insert into PI.dbo.tags(tag_value , Timestamp )vALues(${tagvalues},${date})"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${one}, ${two},${three})"/utf8>>,
+        parse_insert(
+            <<"inSErt  INTO  table75 (column1, column2, column3) values (${one}, ${two},${three})"/utf8>>
+        )
+    ),
+    ?assertEqual(
+        <<"(${tag1},   ${tag2}  )">>,
+        parse_insert(
+            <<"INSERT Into some_table      values\t(${tag1},   ${tag2}  )">>
+        )
+    ),
+    ?assertEqual(
+        <<"(2, 2)">>,
+        parse_insert(
+            <<"INSERT INTO insert_select_testtable (* EXCEPT(b)) Values (2, 2)">>
+        )
+    ),
+    ?assertEqual(
+        <<"(2, 2), (3, ${five})">>,
+        parse_insert(
+            <<"INSERT INTO insert_select_testtable (* EXCEPT(b))Values(2, 2), (3, ${five})">>
+        )
+    ),
+
+    %% `format`
+    ?assertEqual(
+        <<"[(${key}, \"${data}\", ${timestamp})]">>,
+        parse_insert(
+            <<"INSERT INTO mqtt_test(key, data, arrived)",
+                " FORMAT JSONCompactEachRow [(${key}, \"${data}\", ${timestamp})]">>
+        )
+    ),
+    ?assertEqual(
+        <<"(v11, v12, v13), (v21, v22, v23)">>,
+        parse_insert(
+            <<"INSERT INTO   mqtt_test(key, data, arrived) FORMAT Values (v11, v12, v13), (v21, v22, v23)">>
+        )
+    ),
+
+    ?assertEqual(
+        <<"👋    .."/utf8>>,
+        %% Only check if FORMAT_DATA existed after `FORMAT FORMAT_NAME`
+        parse_insert(
+            <<"INSERT INTO   mqtt_test(key, data, arrived) FORMAT AnyFORMAT  👋    .."/utf8>>
+        )
+    ),
+
+    ErrMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>,
+    %% No `FORMAT_DATA`
+    ?assertError(
+        ErrMsg,
+        parse_insert(
+            <<"INSERT INTO   mqtt_test(key, data, arrived) FORMAT Values">>
+        )
+    ),
+    ?assertError(
+        ErrMsg,
+        parse_insert(
+            <<"INSERT INTO   mqtt_test(key, data, arrived) FORMAT Values  ">>
+        )
+    ).
+
 t_send_message_query(Config) ->
     BridgeID = make_bridge(#{enable_batch => false}),
     Key = 42,

+ 2 - 0
changes/fix-14419.en.md

@@ -0,0 +1,2 @@
+Fixed the issue in clickhouse data integration where the insert sql template could not be parsed correctly.
+There can be no spaces around the `VALUES` keyword.