Browse Source

fix(action/postgres): ensure no reconnect callback duplication

zmstone 1 năm trước cách đây
mục cha
commit
401264b929
1 tập tin đã thay đổi với 32 bổ sung17 xóa
  1. 32 17
      apps/emqx_postgresql/src/emqx_postgresql.erl

+ 32 - 17
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -54,7 +54,7 @@
 -export([disable_prepared_statements/0]).
 
 %% for ecpool workers usage
--export([do_get_status/1, prepare_sql_to_conn/2]).
+-export([do_get_status/1, prepare_sql_to_conn/2, get_reconnect_callback_signature/1]).
 
 -define(PGSQL_HOST_OPTIONS, #{
     default_port => ?PGSQL_DEFAULT_PORT
@@ -279,9 +279,15 @@ close_prepared_statement([WorkerPid | Rest], ChannelId, State) ->
     %% again anyway.
     try ecpool_worker:client(WorkerPid) of
         {ok, Conn} ->
-            Statement = get_templated_statement(ChannelId, State),
-            _ = epgsql:close(Conn, Statement),
-            close_prepared_statement(Rest, ChannelId, State);
+            ok = ecpool_worker:remove_reconnect_callback_by_signature(WorkerPid, ChannelId),
+            case get_templated_statement(ChannelId, State) of
+                {ok, Statement} ->
+                    _ = epgsql:close(Conn, Statement),
+                    close_prepared_statement(Rest, ChannelId, State);
+                error ->
+                    %% channel was not added
+                    ok
+            end;
         _ ->
             close_prepared_statement(Rest, ChannelId, State)
     catch
@@ -375,7 +381,7 @@ on_batch_query(
             ?SLOG(error, Log),
             {error, {unrecoverable_error, batch_prepare_not_implemented}};
         {_Statement, RowTemplate} ->
-            StatementTemplate = get_templated_statement(BinKey, State),
+            {ok, StatementTemplate} = get_templated_statement(BinKey, State),
             Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
             emqx_trace:rendered_action_template(
                 Key,
@@ -443,20 +449,23 @@ get_template(Key, #{query_templates := Templates}) ->
     BinKey = to_bin(Key),
     maps:get(BinKey, Templates, undefined).
 
-get_templated_statement(Key, #{installed_channels := Channels} = _State) when
-    is_map_key(Key, Channels)
-->
+get_templated_statement(Key, #{installed_channels := Channels} = _State) ->
     BinKey = to_bin(Key),
-    ChannelState = maps:get(BinKey, Channels),
-    case ChannelState of
-        #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
-            ExprTemplate;
-        #{prepares := #{BinKey := ExprTemplate}} ->
-            ExprTemplate
+    case is_map_key(BinKey, Channels) of
+        true ->
+            ChannelState = maps:get(BinKey, Channels),
+            case ChannelState of
+                #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} ->
+                    {ok, ExprTemplate};
+                #{prepares := #{BinKey := ExprTemplate}} ->
+                    {ok, ExprTemplate}
+            end;
+        false ->
+            error
     end;
 get_templated_statement(Key, #{prepares := PrepStatements}) ->
     BinKey = to_bin(Key),
-    maps:get(BinKey, PrepStatements).
+    {ok, maps:get(BinKey, PrepStatements)}.
 
 on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
     try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
@@ -644,13 +653,13 @@ conn_opts([Opt = {ssl_opts, _} | Opts], Acc) ->
 conn_opts([_Opt | Opts], Acc) ->
     conn_opts(Opts, Acc).
 
-parse_sql_template(Config, SQLID) ->
+parse_sql_template(Config, ChannelId) ->
     Queries =
         case Config of
             #{prepare_statement := Qs} ->
                 Qs;
             #{sql := Query} ->
-                #{SQLID => Query};
+                #{ChannelId => Query};
             #{} ->
                 #{}
         end,
@@ -700,6 +709,12 @@ prepare_sql(Templates, PoolName) ->
             Error
     end.
 
+%% this callback accepts the arg list provided to
+%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
+%% so ecpool_worker can de-duplicate the callbacks based on the signature.
+get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
+    ChannelId.
+
 do_prepare_sql(Templates, PoolName) ->
     do_prepare_sql(ecpool:workers(PoolName), Templates, #{}).