|
|
@@ -124,7 +124,7 @@ on_query(InstanceId, {query, SQL}, State) ->
|
|
|
on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
|
|
|
case maps:find(Key, InsertTksMap) of
|
|
|
{ok, Tokens} when is_map(Data) ->
|
|
|
- SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data),
|
|
|
+ SQL = emqx_placeholder:proc_tmpl(Tokens, Data),
|
|
|
do_query(InstanceId, SQL, State);
|
|
|
_ ->
|
|
|
{error, {unrecoverable_error, invalid_request}}
|
|
|
@@ -209,31 +209,16 @@ execute(Conn, Query, Opts) ->
|
|
|
tdengine:insert(Conn, Query, Opts).
|
|
|
|
|
|
do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
|
|
|
- Queries = aggregate_query(Tokens, BatchReqs),
|
|
|
- SQL = maps:fold(
|
|
|
- fun(InsertPart, Values, Acc) ->
|
|
|
- lists:foldl(
|
|
|
- fun(ValuePart, IAcc) ->
|
|
|
- <<IAcc/binary, " ", ValuePart/binary>>
|
|
|
- end,
|
|
|
- <<Acc/binary, " ", InsertPart/binary, " VALUES">>,
|
|
|
- Values
|
|
|
- )
|
|
|
- end,
|
|
|
- <<"INSERT INTO">>,
|
|
|
- Queries
|
|
|
- ),
|
|
|
+ SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
|
|
|
execute(Conn, SQL, Opts).
|
|
|
|
|
|
-aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
|
|
|
+aggregate_query(BatchTks, BatchReqs, Acc) ->
|
|
|
lists:foldl(
|
|
|
- fun({_, Data}, Acc) ->
|
|
|
- InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data),
|
|
|
- ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data),
|
|
|
- Values = maps:get(InsertPart, Acc, []),
|
|
|
- maps:put(InsertPart, [ParamsPart | Values], Acc)
|
|
|
+ fun({_, Data}, InAcc) ->
|
|
|
+ InsertPart = emqx_placeholder:proc_tmpl(BatchTks, Data),
|
|
|
+ <<InAcc/binary, " ", InsertPart/binary>>
|
|
|
end,
|
|
|
- #{},
|
|
|
+ Acc,
|
|
|
BatchReqs
|
|
|
).
|
|
|
|
|
|
@@ -260,13 +245,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
|
|
|
InsertTks = emqx_placeholder:preproc_tmpl(H),
|
|
|
H1 = string:trim(H, trailing, ";"),
|
|
|
case split_insert_sql(H1) of
|
|
|
- [_InsertStr, InsertPart, _ValuesStr, ParamsPart] ->
|
|
|
- InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart),
|
|
|
- ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart),
|
|
|
+ [_InsertPart, BatchDesc] ->
|
|
|
+ BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc),
|
|
|
parse_batch_prepare_sql(
|
|
|
T,
|
|
|
InsertTksMap#{Key => InsertTks},
|
|
|
- BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}}
|
|
|
+ BatchTksMap#{Key => BatchTks}
|
|
|
);
|
|
|
Result ->
|
|
|
?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
|
|
|
@@ -299,7 +283,7 @@ split_insert_sql(SQL0) ->
|
|
|
{true, E1}
|
|
|
end
|
|
|
end,
|
|
|
- re:split(SQL, "(?i)(insert into)|(?i)(values)")
|
|
|
+ re:split(SQL, "(?i)(insert into)")
|
|
|
).
|
|
|
|
|
|
formalize_sql(Input) ->
|