Quellcode durchsuchen

fix(tdengine): add supports for the `automatically create` feature in the SQL template

firest vor 2 Jahren
Ursprung
Commit
6ff77b221b

+ 91 - 43
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -25,7 +25,7 @@
     on_get_status/2
 ]).
 
--export([connect/1, do_get_status/1, execute/3]).
+-export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]).
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
@@ -124,31 +124,35 @@ on_stop(InstanceId, #{pool_name := PoolName}) ->
 
 on_query(InstanceId, {query, SQL}, State) ->
     do_query(InstanceId, SQL, State);
-on_query(InstanceId, Request, State) ->
-    %% because the `emqx-tdengine` client only supports a single SQL cmd
-    %% so the `on_query` and `on_batch_query` have the same process, that is:
-    %% we need to collect all data into one SQL cmd and then call the insert API
-    on_batch_query(InstanceId, [Request], State).
+on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
+    case maps:find(Key, InsertTksMap) of
+        {ok, Tokens} ->
+            SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data),
+            do_query(InstanceId, SQL, State);
+        _ ->
+            {error, {unrecoverable_error, invalid_request}}
+    end.
 
+%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process
 on_batch_query(
     InstanceId,
-    BatchReq,
-    #{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State
+    [{Key, _} | _] = BatchReq,
+    #{batch_tokens := BatchTksMap, query_opts := Opts} = State
 ) ->
-    case hd(BatchReq) of
-        {Key, _} ->
-            case maps:get(Key, Inserts, undefined) of
-                undefined ->
-                    {error, {unrecoverable_error, batch_prepare_not_implemented}};
-                InsertSQL ->
-                    Tokens = maps:get(Key, ParamsTokens),
-                    do_batch_insert(InstanceId, BatchReq, InsertSQL, Tokens, State)
-            end;
-        Request ->
-            LogMeta = #{connector => InstanceId, first_request => Request, state => State},
-            ?SLOG(error, LogMeta#{msg => "invalid request"}),
-            {error, {unrecoverable_error, invalid_request}}
-    end.
+    case maps:find(Key, BatchTksMap) of
+        {ok, Tokens} ->
+            do_query_job(
+                InstanceId,
+                {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]},
+                State
+            );
+        _ ->
+            {error, {unrecoverable_error, batch_prepare_not_implemented}}
+    end;
+on_batch_query(InstanceId, BatchReq, State) ->
+    LogMeta = #{connector => InstanceId, request => BatchReq, state => State},
+    ?SLOG(error, LogMeta#{msg => "invalid request"}),
+    {error, {unrecoverable_error, invalid_request}}.
 
 on_get_status(_InstanceId, #{pool_name := PoolName}) ->
     Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
@@ -167,17 +171,16 @@ status_result(_Status = false) -> connecting.
 %% Helper fns
 %%========================================================================================
 
-do_batch_insert(InstanceId, BatchReqs, InsertPart, Tokens, State) ->
-    SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens),
-    do_query(InstanceId, SQL, State).
+do_query(InstanceId, Query, #{query_opts := Opts} = State) ->
+    do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State).
 
-do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State) ->
+do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
     ?TRACE(
         "QUERY",
         "tdengine_connector_received",
-        #{connector => InstanceId, query => Query, state => State}
+        #{connector => InstanceId, job => Job, state => State}
     ),
-    Result = ecpool:pick_and_do(PoolName, {?MODULE, execute, [Query, Opts]}, no_handover),
+    Result = ecpool:pick_and_do(PoolName, Job, no_handover),
 
     case Result of
         {error, Reason} ->
@@ -188,7 +191,7 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State
             ?SLOG(error, #{
                 msg => "tdengine_connector_do_query_failed",
                 connector => InstanceId,
-                query => Query,
+                job => Job,
                 reason => Reason
             }),
             Result;
@@ -203,6 +206,37 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State
 execute(Conn, Query, Opts) ->
     tdengine:insert(Conn, Query, Opts).
 
+do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
+    Queries = aggregate_query(Tokens, BatchReqs),
+    SQL = lists:foldl(
+        fun({InsertPart, Values}, Acc) ->
+            lists:foldl(
+                fun(ValuePart, IAcc) ->
+                    <<IAcc/binary, " ", ValuePart/binary>>
+                end,
+                <<Acc/binary, " ", InsertPart/binary, " VALUES">>,
+                Values
+            )
+        end,
+        <<"INSERT INTO">>,
+        Queries
+    ),
+    execute(Conn, SQL, Opts).
+
+aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
+    maps:to_list(
+        lists:foldl(
+            fun({_, Data}, Acc) ->
+                InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data),
+                ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data),
+                Values = maps:get(InsertPart, Acc, []),
+                maps:put(InsertPart, [ParamsPart | Values], Acc)
+            end,
+            #{},
+            BatchReqs
+        )
+    ).
+
 connect(Opts) ->
     tdengine:start_link(Opts).
 
@@ -218,32 +252,46 @@ parse_prepare_sql(Config) ->
 
     parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}).
 
-parse_batch_prepare_sql([{Key, H} | T], BatchInserts, BatchTks) ->
+parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
     case emqx_plugin_libs_rule:detect_sql_type(H) of
         {ok, select} ->
-            parse_batch_prepare_sql(T, BatchInserts, BatchTks);
+            parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
         {ok, insert} ->
-            case emqx_plugin_libs_rule:split_insert_sql(H) of
-                {ok, {InsertSQL, Params}} ->
-                    ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params),
+            InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H),
+            H1 = string:trim(H, trailing, ";"),
+            case split_insert_sql(H1) of
+                [_InsertStr, InsertPart, _ValuesStr, ParamsPart] ->
+                    InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart),
+                    ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart),
                     parse_batch_prepare_sql(
                         T,
-                        BatchInserts#{Key => InsertSQL},
-                        BatchTks#{Key => ParamsTks}
+                        InsertTksMap#{Key => InsertTks},
+                        BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}}
                     );
-                {error, Reason} ->
-                    ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
-                    parse_batch_prepare_sql(T, BatchInserts, BatchTks)
+                _ ->
+                    ?SLOG(error, #{msg => "split sql failed", sql => H}),
+                    parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
             end;
         {error, Reason} ->
             ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
-            parse_batch_prepare_sql(T, BatchInserts, BatchTks)
+            parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
     end;
-parse_batch_prepare_sql([], BatchInserts, BatchTks) ->
+parse_batch_prepare_sql([], InsertTksMap, BatchTksMap) ->
     #{
-        batch_inserts => BatchInserts,
-        batch_params_tokens => BatchTks
+        insert_tokens => InsertTksMap,
+        batch_tokens => BatchTksMap
     }.
 
 to_bin(List) when is_list(List) ->
     unicode:characters_to_binary(List, utf8).
+
+split_insert_sql(SQL0) ->
+    SQL = emqx_plugin_libs_rule:formalize_sql(SQL0),
+    lists:foldr(
+        fun
+            (<<>>, Acc) -> Acc;
+            (E, Acc) -> [string:trim(E) | Acc]
+        end,
+        [],
+        re:split(SQL, "(?i)(insert into)|(?i)(values)")
+    ).

+ 10 - 2
apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl

@@ -32,7 +32,8 @@
     proc_cql_param_str/2,
     split_insert_sql/1,
     detect_sql_type/1,
-    proc_batch_sql/3
+    proc_batch_sql/3,
+    formalize_sql/1
 ]).
 
 %% type converting
@@ -126,7 +127,8 @@ proc_cql_param_str(Tokens, Data) ->
 -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
     InsertSQL :: binary(),
     Params :: binary().
-split_insert_sql(SQL) ->
+split_insert_sql(SQL0) ->
+    SQL = formalize_sql(SQL0),
     case re:split(SQL, "((?i)values)", [{return, binary}]) of
         [Part1, _, Part3] ->
             case string:trim(Part1, leading) of
@@ -173,6 +175,12 @@ proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
     ),
     <<InsertPart/binary, " values ", ValuesPart/binary>>.
 
+formalize_sql(Input) ->
+    %% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
+    SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
+    %% 2. trims the result
+    string:trim(SQL).
+
 unsafe_atom_key(Key) when is_atom(Key) ->
     Key;
 unsafe_atom_key(Key) when is_binary(Key) ->