Преглед на файлове

fix(mysql): delete prepared statement for each channel

zmstone преди 1 година
родител
ревизия
2ab3d585d2

+ 1 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mysql, [
     {description, "EMQX Enterprise MySQL Bridge"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl

@@ -141,7 +141,7 @@ on_remove_channel(
     _InstanceId, #{channels := Channels, connector_state := ConnectorState} = State, ChannelId
 ) when is_map_key(ChannelId, Channels) ->
     ChannelConfig = maps:get(ChannelId, Channels),
-    emqx_mysql:unprepare_sql(maps:merge(ChannelConfig, ConnectorState)),
+    emqx_mysql:unprepare_sql(ChannelId, maps:merge(ChannelConfig, ConnectorState)),
     NewState = State#{channels => maps:remove(ChannelId, Channels)},
     {ok, NewState};
 on_remove_channel(_InstanceId, State, _ChannelId) ->

+ 1 - 1
apps/emqx_mysql/src/emqx_mysql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_mysql, [
     {description, "EMQX MySQL Database Connector"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [
         kernel,

+ 38 - 15
apps/emqx_mysql/src/emqx_mysql.erl

@@ -36,14 +36,14 @@
 ]).
 
 %% ecpool connect & reconnect
--export([connect/1, prepare_sql_to_conn/2]).
+-export([connect/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
 
 -export([
     init_prepare/1,
     prepare_sql/2,
     parse_prepare_sql/1,
     parse_prepare_sql/2,
-    unprepare_sql/1
+    unprepare_sql/2
 ]).
 
 -export([roots/0, fields/1, namespace/0]).
@@ -356,13 +356,16 @@ do_prepare_sql(Templates, PoolName) ->
     prepare_sql_to_conn_list(Conns, Templates).
 
 get_connections_from_pool(PoolName) ->
-    [
-        begin
+    lists:map(
+        fun(Worker) ->
             {ok, Conn} = ecpool_worker:client(Worker),
             Conn
-        end
-     || {_Name, Worker} <- ecpool:workers(PoolName)
-    ].
+        end,
+        pool_workers(PoolName)
+    ).
+
+pool_workers(PoolName) ->
+    lists:map(fun({_Name, Worker}) -> Worker end, ecpool:workers(PoolName)).
 
 prepare_sql_to_conn_list([], _Templates) ->
     ok;
@@ -376,6 +379,21 @@ prepare_sql_to_conn_list([Conn | ConnList], Templates) ->
             {error, R}
     end.
 
+%% this callback accepts the arg list provided to
+%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
+%% so ecpool_worker can de-duplicate the callbacks based on the signature.
+get_reconnect_callback_signature([Templates]) ->
+    [{{ChannelID, _}, _}] = lists:filter(
+        fun
+            ({{_, prepstmt}, _}) ->
+                true;
+            (_) ->
+                false
+        end,
+        Templates
+    ),
+    ChannelID.
+
 prepare_sql_to_conn(_Conn, []) ->
     ok;
 prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
@@ -400,16 +418,21 @@ prepare_sql_to_conn(Conn, [{{Key, prepstmt}, {SQL, _RowTemplate}} | Rest]) ->
 prepare_sql_to_conn(Conn, [{_Key, _Template} | Rest]) ->
     prepare_sql_to_conn(Conn, Rest).
 
-unprepare_sql(#{query_templates := Templates, pool_name := PoolName}) ->
-    ecpool:remove_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn}),
+unprepare_sql(ChannelID, #{query_templates := Templates, pool_name := PoolName}) ->
     lists:foreach(
-        fun(Conn) ->
-            lists:foreach(
-                fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
-                maps:to_list(Templates)
-            )
+        fun(Worker) ->
+            ok = ecpool_worker:remove_reconnect_callback_by_signature(Worker, ChannelID),
+            case ecpool_worker:client(Worker) of
+                {ok, Conn} ->
+                    lists:foreach(
+                        fun(Template) -> unprepare_sql_to_conn(Conn, Template) end,
+                        maps:to_list(Templates)
+                    );
+                _ ->
+                    ok
+            end
         end,
-        get_connections_from_pool(PoolName)
+        pool_workers(PoolName)
     ).
 
 unprepare_sql_to_conn(Conn, {{Key, prepstmt}, _}) ->