JimMoen 1 год назад
Родитель
Сommit
912c61584c
1 измененных файлов с 80 добавлено и 19 удалено
  1. 80 19
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

+ 80 - 19
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -51,6 +51,10 @@
     execute_sql_in_clickhouse_server_using_connection/2
 ]).
 
+-ifdef(TEST).
+-export([split_clickhouse_insert_sql/1]).
+-endif.
+
 %%=====================================================================
 %% Types
 %%=====================================================================
@@ -76,6 +80,75 @@
 
 -type clickhouse_config() :: map().
 
+%%=====================================================================
+%% Macros and On load
+%%=====================================================================
+
+%% Copied from emqx_utils_sql:parse_insert/1
+%% Can also handle Clickhouse's SQL extension for INSERT statments that allows the
+%% user to specify different formats:
+%%
+%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
+%%
+-define(INSERT_RE_MP_KEY, {?MODULE, insert_re_mp}).
+-define(INSERT_RE_BIN, <<
+    %% case-insensitive
+    "(?i)^",
+    %% Leading spaces
+    "\\s*",
+    %% Group-1: insert into, table name and columns (when existed).
+    %% All space characters suffixed to <TABLE_NAME> will be kept
+    %% `INSERT INTO <TABLE_NAME> [(<COLUMN>, ..)]`
+    "(insert\\s+into\\s+[^\\s\\(\\)]+\\s*(?:(?:\\((?:[^()]++|(?2))*\\)\\s*,?\\s*)*))",
+    "\\s*",
+    %% Ignore Group
+    "(?:",
+    %% Group-2 (Optional for FORMAT clause):
+    %% literals value(s) or placeholder(s) with round brackets.
+    %% And the sub-pattern in brackets does not do any capturing
+    %% Ignore Group:
+    %%     `VALUES [([<VALUE> | <PLACEHOLDER>], ...)]`
+    %% Keep Capturing-Group:
+    %%     `([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ..)]`
+    "(?:values\\s*(\\((?:[^()]++|(?2))*\\)(?:\\s*,\\s*\\((?:[^()]++|(?2)*)\\))*))",
+    %% End Group-2
+    %% or
+    "|",
+    %% Group-3:
+    %% literals value(s) or placeholder(s) as `<FORMAT_DATA>`
+    %% Ignore Group:
+    %%     `FORMAT <FORMAT_NAME> <FORMAT_DATA>`
+    %% Keep Capturing-Group `<FORMAT_DATA>` without any check
+    %%   Could be:
+    %%     `([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ...)]`
+    %%     `[([<VALUE> | <PLACEHOLDER>], ...) [, ([<VALUE> | <PLACEHOLDER>], ...)]]`
+    %%     ...
+    "(?:format\\s+[a-zA-Z]+\\s+)((?!\\s)(?=.*\\s).*)",
+    %% End Group-3
+    ")",
+    %% End Ignored Group
+    "\\s*$"
+>>).
+
+-on_load(on_load/0).
+
+on_load() ->
+    put_insert_mp(),
+    ok.
+
+put_insert_mp() ->
+    persistent_term:put(?INSERT_RE_MP_KEY, re:compile(?INSERT_RE_BIN)),
+    ok.
+
+get_insert_mp() ->
+    case persistent_term:get(?INSERT_RE_MP_KEY, undefined) of
+        undefined ->
+            ok = put_insert_mp(),
+            get_insert_mp();
+        {ok, MP} ->
+            {ok, MP}
+    end.
+
 %%=====================================================================
 %% Configuration and default values
 %%=====================================================================
@@ -220,33 +293,21 @@ prepare_sql_bulk_extend_template(Template, Separator) ->
     ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
     emqx_placeholder:preproc_tmpl(ExtendParamTemplate).
 
-%% This function is similar to emqx_utils_sql:parse_insert/1 but can
-%% also handle Clickhouse's SQL extension for INSERT statments that allows the
-%% user to specify different formats:
-%%
-%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
-%%
 split_clickhouse_insert_sql(SQL) ->
     ErrorMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>,
-    case
-        re:split(SQL, "(\\s+(?i:values)|(?i:format\\s+(?:[A-Za-z0-9_])+)\\s+)", [{return, binary}])
-    of
-        [Part1, _, Part3] ->
-            case string:trim(Part1, leading) of
-                <<"insert", _/binary>> ->
-                    Part3;
-                <<"INSERT", _/binary>> ->
-                    Part3;
-                _ ->
-                    erlang:error(ErrorMsg)
-            end;
+    {ok, MP} = get_insert_mp(),
+    case re:run(SQL, MP, [{capture, all_but_first, binary}]) of
+        {match, [_InsertInto, ValuesTemplate]} ->
+            ValuesTemplate;
+        %% Group2 is empty (not `VALUES` statement)
+        {match, [_InsertInto, <<>>, FormatTemplate]} ->
+            FormatTemplate;
         _ ->
             erlang:error(ErrorMsg)
     end.
 
 % This is a callback for ecpool which is triggered by the call to
 % emqx_resource_pool:start in on_start/2
-
 connect(Options) ->
     URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
     User = proplists:get_value(user, Options),