Ver código fonte

feat(bridge): add `on_batch_query` on emqx_connector_mysql

firest 3 anos atrás
pai
commit
8d8afd1688

+ 122 - 32
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -28,6 +28,7 @@
     on_start/2,
     on_stop/2,
     on_query/3,
+    on_batch_query/3,
     on_get_status/2
 ]).
 
@@ -47,12 +48,15 @@
 
 -type prepares() :: #{atom() => binary()}.
 -type params_tokens() :: #{atom() => list()}.
+-type sqls() :: #{atom() => binary()}.
 -type state() ::
     #{
         poolname := atom(),
-        prepare_statement := prepares(),
         auto_reconnect := boolean(),
-        params_tokens := params_tokens()
+        prepare_statement := prepares(),
+        params_tokens := params_tokens(),
+        batch_inserts := sqls(),
+        batch_params_tokens := params_tokens()
     }.
 
 %%=====================================================================
@@ -134,48 +138,46 @@ on_query(
     {TypeOrKey, SQLOrKey, Params, Timeout},
     #{poolname := PoolName, prepare_statement := Prepares} = State
 ) ->
-    LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
-    ?TRACE("QUERY", "mysql_connector_received", LogMeta),
-    Worker = ecpool:get_client(PoolName),
-    {ok, Conn} = ecpool_worker:client(Worker),
     MySqlFunction = mysql_function(TypeOrKey),
     {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
-    Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]),
-    case Result of
-        {error, disconnected} ->
-            ?SLOG(
-                error,
-                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
-            ),
-            %% kill the poll worker to trigger reconnection
-            _ = exit(Conn, restart),
-            Result;
+    case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
         {error, not_prepared} ->
-            ?SLOG(
-                warning,
-                LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
-            ),
             case prepare_sql(Prepares, PoolName) of
                 ok ->
                     %% not return result, next loop will try again
                     on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
                 {error, Reason} ->
+                    LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
                     ?SLOG(
                         error,
                         LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}
                     ),
                     {error, Reason}
             end;
-        {error, Reason} ->
-            ?SLOG(
-                error,
-                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
-            ),
-            Result;
-        _ ->
+        Result ->
             Result
     end.
 
+on_batch_query(
+    InstId,
+    BatchReq,
+    #{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State
+) ->
+    case hd(BatchReq) of
+        {Key, _} ->
+            case maps:get(Key, Inserts, undefined) of
+                undefined ->
+                    {error, batch_select_not_implemented};
+                InsertSQL ->
+                    Tokens = maps:get(Key, ParamsTokens),
+                    on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
+            end;
+        Request ->
+            LogMeta = #{connector => InstId, first_request => Request, state => State},
+            ?SLOG(error, LogMeta#{msg => "invalid request"}),
+            {error, invald_request}
+    end.
+
 mysql_function(sql) ->
     query;
 mysql_function(prepared_query) ->
@@ -316,13 +318,44 @@ parse_prepare_sql(Config) ->
             Any ->
                 Any
         end,
-    parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
+    parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}).
 
-parse_prepare_sql([{Key, H} | T], SQL, Tokens) ->
+parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) ->
     {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H),
-    parse_prepare_sql(T, SQL#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens});
-parse_prepare_sql([], SQL, Tokens) ->
-    #{prepare_statement => SQL, params_tokens => Tokens}.
+    parse_batch_prepare_sql(
+        L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks
+    );
+parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) ->
+    #{
+        prepare_statement => Prepares,
+        params_tokens => Tokens,
+        batch_inserts => BatchInserts,
+        batch_params_tokens => BatchTks
+    }.
+
+parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) ->
+    case emqx_plugin_libs_rule:detect_sql_type(H) of
+        {ok, select} ->
+            parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
+        {ok, insert} ->
+            case emqx_plugin_libs_rule:split_insert_sql(H) of
+                {ok, {InsertSQL, Params}} ->
+                    ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params),
+                    parse_prepare_sql(
+                        T,
+                        Prepares,
+                        Tokens,
+                        BatchInserts#{Key => InsertSQL},
+                        BatchTks#{Key => ParamsTks}
+                    );
+                {error, Reason} ->
+                    ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
+                    parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
+            end;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
+            parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
+    end.
 
 proc_sql_params(query, SQLOrKey, Params, _State) ->
     {SQLOrKey, Params};
@@ -335,3 +368,60 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
         Tokens ->
             {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
     end.
+
+on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->
+    JoinFun = fun
+        ([Msg]) ->
+            emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Msg);
+        ([H | T]) ->
+            lists:foldl(
+                fun(Msg, Acc) ->
+                    Value = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Msg),
+                    <<Acc/binary, ", ", Value/binary>>
+                end,
+                emqx_plugin_libs_rule:proc_sql_param_str(Tokens, H),
+                T
+            )
+    end,
+    {_, Msgs} = lists:unzip(BatchReqs),
+    JoinPart = JoinFun(Msgs),
+    SQL = <<InsertPart/binary, " values ", JoinPart/binary>>,
+    on_sql_query(InstId, query, SQL, [], default_timeout, State).
+
+on_sql_query(
+    InstId,
+    SQLFunc,
+    SQLOrKey,
+    Data,
+    Timeout,
+    #{poolname := PoolName} = State
+) ->
+    LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
+    ?TRACE("QUERY", "mysql_connector_received", LogMeta),
+    Worker = ecpool:get_client(PoolName),
+    {ok, Conn} = ecpool_worker:client(Worker),
+    Result = erlang:apply(mysql, SQLFunc, [Conn, SQLOrKey, Data, Timeout]),
+    case Result of
+        {error, disconnected} ->
+            ?SLOG(
+                error,
+                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
+            ),
+            %% kill the poll worker to trigger reconnection
+            _ = exit(Conn, restart),
+            Result;
+        {error, not_prepared} = Error ->
+            ?SLOG(
+                warning,
+                LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
+            ),
+            Error;
+        {error, Reason} ->
+            ?SLOG(
+                error,
+                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
+            ),
+            Result;
+        _ ->
+            Result
+    end.

+ 40 - 1
apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl

@@ -29,7 +29,9 @@
     preproc_sql/2,
     proc_sql/2,
     proc_sql_param_str/2,
-    proc_cql_param_str/2
+    proc_cql_param_str/2,
+    split_insert_sql/1,
+    detect_sql_type/1
 ]).
 
 %% type converting
@@ -123,6 +125,43 @@ proc_sql_param_str(Tokens, Data) ->
 proc_cql_param_str(Tokens, Data) ->
     emqx_placeholder:proc_cql_param_str(Tokens, Data).
 
+%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
+-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
+    InsertSQL :: binary(),
+    Params :: binary().
+split_insert_sql(SQL) ->
+    case re:split(SQL, "((?i)values)", [{return, binary}]) of
+        [Part1, _, Part3] ->
+            case string:trim(Part1, leading) of
+                <<"insert", _/binary>> = InsertSQL ->
+                    {ok, {InsertSQL, Part3}};
+                <<"INSERT", _/binary>> = InsertSQL ->
+                    {ok, {InsertSQL, Part3}};
+                _ ->
+                    {error, not_insert_sql}
+            end;
+        _ ->
+            {error, not_insert_sql}
+    end.
+
+-spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
+    Type :: insert | select.
+detect_sql_type(SQL) ->
+    case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
+        {match, [First]} ->
+            Types = [select, insert],
+            PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
+            LowFirst = string:lowercase(First),
+            case proplists:lookup(LowFirst, PropTypes) of
+                {LowFirst, Type} ->
+                    {ok, Type};
+                _ ->
+                    {error, invalid_sql}
+            end;
+        _ ->
+            {error, invalid_sql}
+    end.
+
 unsafe_atom_key(Key) when is_atom(Key) ->
     Key;
 unsafe_atom_key(Key) when is_binary(Key) ->

+ 14 - 2
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl

@@ -6,6 +6,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include("emqx_ee_bridge.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
@@ -44,7 +45,6 @@ values(post) ->
     #{
         type => mysql,
         name => <<"foo">>,
-        local_topic => <<"local/topic/#">>,
         sql_template => ?DEFAULT_SQL,
         connector => #{
             server => <<"127.0.0.1:3306">>,
@@ -54,6 +54,19 @@ values(post) ->
             password => <<"">>,
             auto_reconnect => true
         },
+        resource_opts => #{
+            health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+            start_after_created => ?START_AFTER_CREATED,
+            start_timeout => ?START_TIMEOUT_RAW,
+            auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+            query_mode => sync,
+            async_inflight_window => ?DEFAULT_INFLIGHT,
+            enable_batch => false,
+            batch_size => ?DEFAULT_BATCH_SIZE,
+            batch_time => ?DEFAULT_BATCH_TIME,
+            enable_queue => false,
+            max_queue_bytes => ?DEFAULT_QUEUE_SIZE
+        },
         enable => true,
         direction => egress
     };
@@ -70,7 +83,6 @@ fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
         {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
-        {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
         {sql_template,
             mk(
                 binary(),