Просмотр исходного кода

Merge pull request #9699 from thalesmg/fix-buffer-clear-replayq-on-delete-v50

fix(buffer): fix `replayq` usages in buffer workers (5.0)
Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
167b623497
25 измененных файлов с 1539 добавлено и 610 удалено
  1. 5 2
      apps/emqx_bridge/src/emqx_bridge.erl
  2. 21 24
      apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
  3. 2 1
      apps/emqx_connector/src/emqx_connector_http.erl
  4. 10 1
      apps/emqx_connector/src/emqx_connector_mqtt.erl
  5. 6 6
      apps/emqx_connector/src/emqx_connector_mysql.erl
  6. 44 23
      apps/emqx_connector/src/emqx_connector_pgsql.erl
  7. 16 4
      apps/emqx_connector/src/emqx_connector_redis.erl
  8. 6 2
      apps/emqx_connector/test/emqx_connector_redis_SUITE.erl
  9. 11 0
      apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
  10. 1 6
      apps/emqx_resource/src/emqx_resource.erl
  11. 705 388
      apps/emqx_resource/src/emqx_resource_worker.erl
  12. 6 1
      apps/emqx_resource/src/emqx_resource_worker_sup.erl
  13. 6 0
      apps/emqx_resource/src/schema/emqx_resource_schema.erl
  14. 66 7
      apps/emqx_resource/test/emqx_connector_demo.erl
  15. 504 57
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  16. 0 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl
  17. 22 26
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl
  18. 26 12
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl
  19. 40 19
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl
  20. 28 17
      lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl
  21. 1 1
      lib-ee/emqx_ee_connector/rebar.config
  22. 5 4
      lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl
  23. 5 5
      lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl
  24. 2 2
      mix.exs
  25. 1 1
      rebar.config

+ 5 - 2
apps/emqx_bridge/src/emqx_bridge.erl

@@ -170,8 +170,11 @@ send_message(BridgeId, Message) ->
     case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
     case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
         not_found ->
         not_found ->
             {error, {bridge_not_found, BridgeId}};
             {error, {bridge_not_found, BridgeId}};
-        #{enable := true} ->
-            emqx_resource:query(ResId, {send_message, Message});
+        #{enable := true} = Config ->
+            Timeout = emqx_map_lib:deep_get(
+                [resource_opts, request_timeout], Config, timer:seconds(15)
+            ),
+            emqx_resource:query(ResId, {send_message, Message}, #{timeout => Timeout});
         #{enable := false} ->
         #{enable := false} ->
             {error, {bridge_stopped, BridgeId}}
             {error, {bridge_stopped, BridgeId}}
     end.
     end.

+ 21 - 24
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -145,10 +145,12 @@ set_special_configs(_) ->
 
 
 init_per_testcase(_, Config) ->
 init_per_testcase(_, Config) ->
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+    ok = snabbkaffe:start_trace(),
     Config.
     Config.
 end_per_testcase(_, _Config) ->
 end_per_testcase(_, _Config) ->
     clear_resources(),
     clear_resources(),
     emqx_common_test_helpers:call_janitor(),
     emqx_common_test_helpers:call_janitor(),
+    snabbkaffe:stop(),
     ok.
     ok.
 
 
 clear_resources() ->
 clear_resources() ->
@@ -478,8 +480,6 @@ t_egress_custom_clientid_prefix(_Config) ->
     end,
     end,
 
 
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
-    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
-
     ok.
     ok.
 
 
 t_mqtt_conn_bridge_ingress_and_egress(_) ->
 t_mqtt_conn_bridge_ingress_and_egress(_) ->
@@ -830,6 +830,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
             <<"resource_opts">> => #{
             <<"resource_opts">> => #{
                 <<"worker_pool_size">> => 2,
                 <<"worker_pool_size">> => 2,
                 <<"query_mode">> => <<"sync">>,
                 <<"query_mode">> => <<"sync">>,
+                <<"request_timeout">> => <<"500ms">>,
                 %% to make it check the healthy quickly
                 %% to make it check the healthy quickly
                 <<"health_check_interval">> => <<"0.5s">>
                 <<"health_check_interval">> => <<"0.5s">>
             }
             }
@@ -880,17 +881,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     ok = emqx_listeners:stop_listener('tcp:default'),
     ok = emqx_listeners:stop_listener('tcp:default'),
     ct:sleep(1500),
     ct:sleep(1500),
 
 
-    %% PUBLISH 2 messages to the 'local' broker, the message should
-    ok = snabbkaffe:start_trace(),
+    %% PUBLISH 2 messages to the 'local' broker, the messages should
+    %% be enqueued and the resource will block
     {ok, SRef} =
     {ok, SRef} =
         snabbkaffe:subscribe(
         snabbkaffe:subscribe(
             fun
             fun
-                (
-                    #{
-                        ?snk_kind := call_query_enter,
-                        query := {query, _From, {send_message, #{}}, _Sent}
-                    }
-                ) ->
+                (#{?snk_kind := resource_worker_retry_inflight_failed}) ->
+                    true;
+                (#{?snk_kind := resource_worker_flush_nack}) ->
                     true;
                     true;
                 (_) ->
                 (_) ->
                     false
                     false
@@ -903,7 +901,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     emqx:publish(emqx_message:make(LocalTopic, Payload1)),
     emqx:publish(emqx_message:make(LocalTopic, Payload1)),
     emqx:publish(emqx_message:make(LocalTopic, Payload2)),
     emqx:publish(emqx_message:make(LocalTopic, Payload2)),
     {ok, _} = snabbkaffe:receive_events(SRef),
     {ok, _} = snabbkaffe:receive_events(SRef),
-    ok = snabbkaffe:stop(),
 
 
     %% verify the metrics of the bridge, the message should be queued
     %% verify the metrics of the bridge, the message should be queued
     {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
     {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
@@ -920,7 +917,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
             <<"matched">> := Matched,
             <<"matched">> := Matched,
             <<"success">> := 1,
             <<"success">> := 1,
             <<"failed">> := 0,
             <<"failed">> := 0,
-            <<"queuing">> := 2
+            <<"queuing">> := 1,
+            <<"inflight">> := 1
         } when Matched >= 3,
         } when Matched >= 3,
         maps:get(<<"metrics">>, DecodedMetrics1)
         maps:get(<<"metrics">>, DecodedMetrics1)
     ),
     ),
@@ -952,18 +950,17 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     ok.
     ok.
 
 
 assert_mqtt_msg_received(Topic, Payload) ->
 assert_mqtt_msg_received(Topic, Payload) ->
-    ?assert(
-        receive
-            {deliver, Topic, #message{payload = Payload}} ->
-                ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
-                true;
-            Msg ->
-                ct:pal("Unexpected Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ).
+    ct:pal("checking if ~p has been received on ~p", [Payload, Topic]),
+    receive
+        {deliver, Topic, #message{payload = Payload}} ->
+            ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
+            ok;
+        Msg ->
+            ct:pal("Unexpected Msg: ~p", [Msg]),
+            assert_mqtt_msg_received(Topic, Payload)
+    after 100 ->
+        ct:fail("timeout waiting for ~p on topic ~p", [Payload, Topic])
+    end.
 
 
 request(Method, Url, Body) ->
 request(Method, Url, Body) ->
     request(<<"connector_admin">>, Method, Url, Body).
     request(<<"connector_admin">>, Method, Url, Body).

+ 2 - 1
apps/emqx_connector/src/emqx_connector_http.erl

@@ -380,7 +380,8 @@ on_query_async(
         NRequest,
         NRequest,
         Timeout,
         Timeout,
         {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
         {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]}
-    ).
+    ),
+    {ok, Worker}.
 
 
 on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
 on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
     case do_get_status(PoolName, Timeout) of
     case do_get_status(PoolName, Timeout) of

+ 10 - 1
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -198,7 +198,10 @@ on_query_async(
     #{name := InstanceId}
     #{name := InstanceId}
 ) ->
 ) ->
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
-    emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
+    %% this is a cast, currently.
+    ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}),
+    WorkerPid = get_worker_pid(InstanceId),
+    {ok, WorkerPid}.
 
 
 on_get_status(_InstId, #{name := InstanceId}) ->
 on_get_status(_InstId, #{name := InstanceId}) ->
     case emqx_connector_mqtt_worker:status(InstanceId) of
     case emqx_connector_mqtt_worker:status(InstanceId) of
@@ -212,6 +215,12 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
         {error, Reason} -> {error, Reason}
         {error, Reason} -> {error, Reason}
     end.
     end.
 
 
+%% mqtt workers, when created and called via bridge callbacks, are
+%% registered.
+-spec get_worker_pid(atom()) -> pid().
+get_worker_pid(InstanceId) ->
+    whereis(InstanceId).
+
 make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
 make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
     undefined;
     undefined;
 make_sub_confs(undefined, _Conf, _) ->
 make_sub_confs(undefined, _Conf, _) ->

+ 6 - 6
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -192,7 +192,7 @@ on_batch_query(
         {Key, _} ->
         {Key, _} ->
             case maps:get(Key, Inserts, undefined) of
             case maps:get(Key, Inserts, undefined) of
                 undefined ->
                 undefined ->
-                    {error, batch_select_not_implemented};
+                    {error, {unrecoverable_error, batch_select_not_implemented}};
                 InsertSQL ->
                 InsertSQL ->
                     Tokens = maps:get(Key, ParamsTokens),
                     Tokens = maps:get(Key, ParamsTokens),
                     on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
                     on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
@@ -200,7 +200,7 @@ on_batch_query(
         Request ->
         Request ->
             LogMeta = #{connector => InstId, first_request => Request, state => State},
             LogMeta = #{connector => InstId, first_request => Request, state => State},
             ?SLOG(error, LogMeta#{msg => "invalid request"}),
             ?SLOG(error, LogMeta#{msg => "invalid request"}),
-            {error, invalid_request}
+            {error, {unrecoverable_error, invalid_request}}
     end.
     end.
 
 
 mysql_function(sql) ->
 mysql_function(sql) ->
@@ -267,7 +267,7 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
 maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
 maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
     case maps:is_key(SQLOrKey, Prepares) of
     case maps:is_key(SQLOrKey, Prepares) of
         true -> prepare_sql(Prepares, PoolName);
         true -> prepare_sql(Prepares, PoolName);
-        false -> {error, prepared_statement_invalid}
+        false -> {error, {unrecoverable_error, prepared_statement_invalid}}
     end.
     end.
 
 
 prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
 prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
@@ -465,12 +465,12 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
                 LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
                 LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
             ),
             ),
             {error, {recoverable_error, Reason}};
             {error, {recoverable_error, Reason}};
-        {error, Reason} = Result ->
+        {error, Reason} ->
             ?SLOG(
             ?SLOG(
                 error,
                 error,
                 LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
                 LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
             ),
             ),
-            Result;
+            {error, {unrecoverable_error, Reason}};
         Result ->
         Result ->
             ?tp(
             ?tp(
                 mysql_connector_query_return,
                 mysql_connector_query_return,
@@ -483,5 +483,5 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
                 error,
                 error,
                 LogMeta#{msg => "mysql_connector_invalid_params", params => Data}
                 LogMeta#{msg => "mysql_connector_invalid_params", params => Data}
             ),
             ),
-            {error, {invalid_params, Data}}
+            {error, {unrecoverable_error, {invalid_params, Data}}}
     end.
     end.

+ 44 - 23
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -153,7 +153,8 @@ on_query(
     }),
     }),
     Type = pgsql_query_type(TypeOrKey),
     Type = pgsql_query_type(TypeOrKey),
     {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
     {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
-    on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data).
+    Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
+    handle_result(Res).
 
 
 pgsql_query_type(sql) ->
 pgsql_query_type(sql) ->
     query;
     query;
@@ -182,23 +183,17 @@ on_batch_query(
                         msg => "batch prepare not implemented"
                         msg => "batch prepare not implemented"
                     },
                     },
                     ?SLOG(error, Log),
                     ?SLOG(error, Log),
-                    {error, batch_prepare_not_implemented};
+                    {error, {unrecoverable_error, batch_prepare_not_implemented}};
                 TokenList ->
                 TokenList ->
                     {_, Datas} = lists:unzip(BatchReq),
                     {_, Datas} = lists:unzip(BatchReq),
                     Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
                     Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
                     St = maps:get(BinKey, Sts),
                     St = maps:get(BinKey, Sts),
-                    {_Column, Results} = on_sql_query(InstId, PoolName, execute_batch, St, Datas2),
-                    %% this local function only suits for the result of batch insert
-                    TransResult = fun
-                        Trans([{ok, Count} | T], Acc) ->
-                            Trans(T, Acc + Count);
-                        Trans([{error, _} = Error | _], _Acc) ->
-                            Error;
-                        Trans([], Acc) ->
-                            {ok, Acc}
-                    end,
-
-                    TransResult(Results, 0)
+                    case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
+                        {error, Error} ->
+                            {error, Error};
+                        {_Column, Results} ->
+                            handle_batch_result(Results, 0)
+                    end
             end;
             end;
         _ ->
         _ ->
             Log = #{
             Log = #{
@@ -208,7 +203,7 @@ on_batch_query(
                 msg => "invalid request"
                 msg => "invalid request"
             },
             },
             ?SLOG(error, Log),
             ?SLOG(error, Log),
-            {error, invalid_request}
+            {error, {unrecoverable_error, invalid_request}}
     end.
     end.
 
 
 proc_sql_params(query, SQLOrKey, Params, _State) ->
 proc_sql_params(query, SQLOrKey, Params, _State) ->
@@ -225,24 +220,38 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
     end.
     end.
 
 
 on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
 on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
-    Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover),
-    case Result of
-        {error, Reason} ->
+    try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
+        {error, Reason} = Result ->
+            ?tp(
+                pgsql_connector_query_return,
+                #{error => Reason}
+            ),
             ?SLOG(error, #{
             ?SLOG(error, #{
                 msg => "postgresql connector do sql query failed",
                 msg => "postgresql connector do sql query failed",
                 connector => InstId,
                 connector => InstId,
                 type => Type,
                 type => Type,
                 sql => NameOrSQL,
                 sql => NameOrSQL,
                 reason => Reason
                 reason => Reason
-            });
-        _ ->
+            }),
+            Result;
+        Result ->
             ?tp(
             ?tp(
                 pgsql_connector_query_return,
                 pgsql_connector_query_return,
                 #{result => Result}
                 #{result => Result}
             ),
             ),
-            ok
-    end,
-    Result.
+            Result
+    catch
+        error:function_clause:Stacktrace ->
+            ?SLOG(error, #{
+                msg => "postgresql connector do sql query failed",
+                connector => InstId,
+                type => Type,
+                sql => NameOrSQL,
+                reason => function_clause,
+                stacktrace => Stacktrace
+            }),
+            {error, {unrecoverable_error, invalid_request}}
+    end.
 
 
 on_get_status(_InstId, #{poolname := Pool} = State) ->
 on_get_status(_InstId, #{poolname := Pool} = State) ->
     case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
     case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
@@ -407,3 +416,15 @@ to_bin(Bin) when is_binary(Bin) ->
     Bin;
     Bin;
 to_bin(Atom) when is_atom(Atom) ->
 to_bin(Atom) when is_atom(Atom) ->
     erlang:atom_to_binary(Atom).
     erlang:atom_to_binary(Atom).
+
+handle_result({error, Error}) ->
+    {error, {unrecoverable_error, Error}};
+handle_result(Res) ->
+    Res.
+
+handle_batch_result([{ok, Count} | Rest], Acc) ->
+    handle_batch_result(Rest, Acc + Count);
+handle_batch_result([{error, Error} | _Rest], _Acc) ->
+    {error, {unrecoverable_error, Error}};
+handle_batch_result([], Acc) ->
+    {ok, Acc}.

+ 16 - 4
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -207,11 +207,23 @@ do_query(InstId, Query, #{poolname := PoolName, type := Type} = State) ->
                 connector => InstId,
                 connector => InstId,
                 query => Query,
                 query => Query,
                 reason => Reason
                 reason => Reason
-            });
+            }),
+            case is_unrecoverable_error(Reason) of
+                true ->
+                    {error, {unrecoverable_error, Reason}};
+                false ->
+                    Result
+            end;
         _ ->
         _ ->
-            ok
-    end,
-    Result.
+            Result
+    end.
+
+is_unrecoverable_error(Results) when is_list(Results) ->
+    lists:any(fun is_unrecoverable_error/1, Results);
+is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) ->
+    true;
+is_unrecoverable_error(_) ->
+    false.
 
 
 extract_eredis_cluster_workers(PoolName) ->
 extract_eredis_cluster_workers(PoolName) ->
     lists:flatten([
     lists:flatten([

+ 6 - 2
apps/emqx_connector/test/emqx_connector_redis_SUITE.erl

@@ -128,8 +128,12 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
         emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]})
         emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]})
     ),
     ),
     ?assertMatch(
     ?assertMatch(
-        {error, [{ok, <<"PONG">>}, {error, _}]},
-        emqx_resource:query(PoolName, {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]})
+        {error, {unrecoverable_error, [{ok, <<"PONG">>}, {error, _}]}},
+        emqx_resource:query(
+            PoolName,
+            {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]},
+            #{timeout => 500}
+        )
     ),
     ),
     ?assertEqual(ok, emqx_resource:stop(PoolName)),
     ?assertEqual(ok, emqx_resource:stop(PoolName)),
     % Resource will be listed still, but state will be changed and healthcheck will fail
     % Resource will be listed still, but state will be changed and healthcheck will fail

+ 11 - 0
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
     }
     }
   }
   }
 
 
+  request_timeout {
+    desc {
+      en: """Timeout for requests.  If <code>query_mode</code> is <code>sync</code>, calls to the resource will be blocked for this amount of time before timing out."""
+      zh: """请求的超时。 如果<code>query_mode</code>是<code>sync</code>,对资源的调用将在超时前被阻断这一时间。"""
+    }
+    label {
+      en: """Request timeout"""
+      zh: """请求超时"""
+    }
+  }
+
   enable_batch {
   enable_batch {
     desc {
     desc {
       en: """Batch mode enabled."""
       en: """Batch mode enabled."""

+ 1 - 6
apps/emqx_resource/src/emqx_resource.erl

@@ -79,8 +79,7 @@
     query/2,
     query/2,
     query/3,
     query/3,
     %% query the instance without batching and queuing messages.
     %% query the instance without batching and queuing messages.
-    simple_sync_query/2,
-    simple_async_query/3
+    simple_sync_query/2
 ]).
 ]).
 
 
 %% Direct calls to the callback module
 %% Direct calls to the callback module
@@ -278,10 +277,6 @@ query(ResId, Request, Opts) ->
 simple_sync_query(ResId, Request) ->
 simple_sync_query(ResId, Request) ->
     emqx_resource_worker:simple_sync_query(ResId, Request).
     emqx_resource_worker:simple_sync_query(ResId, Request).
 
 
--spec simple_async_query(resource_id(), Request :: term(), reply_fun()) -> Result :: term().
-simple_async_query(ResId, Request, ReplyFun) ->
-    emqx_resource_worker:simple_async_query(ResId, Request, ReplyFun).
-
 -spec start(resource_id()) -> ok | {error, Reason :: term()}.
 -spec start(resource_id()) -> ok | {error, Reason :: term()}.
 start(ResId) ->
 start(ResId) ->
     start(ResId, #{}).
     start(ResId, #{}).

Разница между файлами не показана из-за своего большого размера
+ 705 - 388
apps/emqx_resource/src/emqx_resource_worker.erl


+ 6 - 1
apps/emqx_resource/src/emqx_resource_worker_sup.erl

@@ -67,7 +67,8 @@ stop_workers(ResId, Opts) ->
     WorkerPoolSize = worker_pool_size(Opts),
     WorkerPoolSize = worker_pool_size(Opts),
     lists:foreach(
     lists:foreach(
         fun(Idx) ->
         fun(Idx) ->
-            ensure_worker_removed(ResId, Idx)
+            _ = ensure_worker_removed(ResId, Idx),
+            ensure_disk_queue_dir_absent(ResId, Idx)
         end,
         end,
         lists:seq(1, WorkerPoolSize)
         lists:seq(1, WorkerPoolSize)
     ),
     ),
@@ -127,6 +128,10 @@ ensure_worker_removed(ResId, Idx) ->
             {error, Reason}
             {error, Reason}
     end.
     end.
 
 
+ensure_disk_queue_dir_absent(ResourceId, Index) ->
+    ok = emqx_resource_worker:clear_disk_queue_dir(ResourceId, Index),
+    ok.
+
 ensure_worker_pool_removed(ResId) ->
 ensure_worker_pool_removed(ResId) ->
     try
     try
         gproc_pool:delete(ResId)
         gproc_pool:delete(ResId)

+ 6 - 0
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -48,6 +48,7 @@ fields("creation_opts") ->
         {health_check_interval, fun health_check_interval/1},
         {health_check_interval, fun health_check_interval/1},
         {auto_restart_interval, fun auto_restart_interval/1},
         {auto_restart_interval, fun auto_restart_interval/1},
         {query_mode, fun query_mode/1},
         {query_mode, fun query_mode/1},
+        {request_timeout, fun request_timeout/1},
         {async_inflight_window, fun async_inflight_window/1},
         {async_inflight_window, fun async_inflight_window/1},
         {enable_batch, fun enable_batch/1},
         {enable_batch, fun enable_batch/1},
         {batch_size, fun batch_size/1},
         {batch_size, fun batch_size/1},
@@ -80,6 +81,11 @@ query_mode(default) -> async;
 query_mode(required) -> false;
 query_mode(required) -> false;
 query_mode(_) -> undefined.
 query_mode(_) -> undefined.
 
 
+request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
+request_timeout(desc) -> ?DESC("request_timeout");
+request_timeout(default) -> <<"15s">>;
+request_timeout(_) -> undefined.
+
 enable_batch(type) -> boolean();
 enable_batch(type) -> boolean();
 enable_batch(required) -> false;
 enable_batch(required) -> false;
 enable_batch(default) -> true;
 enable_batch(default) -> true;

+ 66 - 7
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -85,9 +85,25 @@ on_query(_InstId, get_state_failed, State) ->
 on_query(_InstId, block, #{pid := Pid}) ->
 on_query(_InstId, block, #{pid := Pid}) ->
     Pid ! block,
     Pid ! block,
     ok;
     ok;
+on_query(_InstId, block_now, #{pid := Pid}) ->
+    Pid ! block,
+    {error, {resource_error, #{reason => blocked, msg => blocked}}};
 on_query(_InstId, resume, #{pid := Pid}) ->
 on_query(_InstId, resume, #{pid := Pid}) ->
     Pid ! resume,
     Pid ! resume,
     ok;
     ok;
+on_query(_InstId, {big_payload, Payload}, #{pid := Pid}) ->
+    ReqRef = make_ref(),
+    From = {self(), ReqRef},
+    Pid ! {From, {big_payload, Payload}},
+    receive
+        {ReqRef, ok} ->
+            ?tp(connector_demo_big_payload, #{payload => Payload}),
+            ok;
+        {ReqRef, incorrect_status} ->
+            {error, {recoverable_error, incorrect_status}}
+    after 1000 ->
+        {error, timeout}
+    end;
 on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
 on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
     ReqRef = make_ref(),
     ReqRef = make_ref(),
     From = {self(), ReqRef},
     From = {self(), ReqRef},
@@ -122,10 +138,16 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
 
 
 on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
 on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
     Pid ! {inc, N, ReplyFun},
     Pid ! {inc, N, ReplyFun},
-    ok;
+    {ok, Pid};
 on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
 on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
     Pid ! {get, ReplyFun},
     Pid ! {get, ReplyFun},
-    ok.
+    {ok, Pid};
+on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
+    Pid ! {block_now, ReplyFun},
+    {ok, Pid};
+on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
+    Pid ! {big_payload, Payload, ReplyFun},
+    {ok, Pid}.
 
 
 on_batch_query(InstId, BatchReq, State) ->
 on_batch_query(InstId, BatchReq, State) ->
     %% Requests can be either 'get_counter' or 'inc_counter', but
     %% Requests can be either 'get_counter' or 'inc_counter', but
@@ -134,17 +156,22 @@ on_batch_query(InstId, BatchReq, State) ->
         {inc_counter, _} ->
         {inc_counter, _} ->
             batch_inc_counter(sync, InstId, BatchReq, State);
             batch_inc_counter(sync, InstId, BatchReq, State);
         get_counter ->
         get_counter ->
-            batch_get_counter(sync, InstId, State)
+            batch_get_counter(sync, InstId, State);
+        {big_payload, _Payload} ->
+            batch_big_payload(sync, InstId, BatchReq, State)
     end.
     end.
 
 
 on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
 on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
-    %% Requests can be either 'get_counter' or 'inc_counter', but
-    %% cannot be mixed.
+    %% Requests can be of multiple types, but cannot be mixed.
     case hd(BatchReq) of
     case hd(BatchReq) of
         {inc_counter, _} ->
         {inc_counter, _} ->
             batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State);
             batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State);
         get_counter ->
         get_counter ->
-            batch_get_counter({async, ReplyFunAndArgs}, InstId, State)
+            batch_get_counter({async, ReplyFunAndArgs}, InstId, State);
+        block_now ->
+            on_query_async(InstId, block_now, ReplyFunAndArgs, State);
+        {big_payload, _Payload} ->
+            batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State)
     end.
     end.
 
 
 batch_inc_counter(CallMode, InstId, BatchReq, State) ->
 batch_inc_counter(CallMode, InstId, BatchReq, State) ->
@@ -171,6 +198,19 @@ batch_get_counter(sync, InstId, State) ->
 batch_get_counter({async, ReplyFunAndArgs}, InstId, State) ->
 batch_get_counter({async, ReplyFunAndArgs}, InstId, State) ->
     on_query_async(InstId, get_counter, ReplyFunAndArgs, State).
     on_query_async(InstId, get_counter, ReplyFunAndArgs, State).
 
 
+batch_big_payload(sync, InstId, Batch, State) ->
+    [Res | _] = lists:map(
+        fun(Req = {big_payload, _}) -> on_query(InstId, Req, State) end,
+        Batch
+    ),
+    Res;
+batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}) ->
+    lists:foreach(
+        fun(Req = {big_payload, _}) -> on_query_async(InstId, Req, ReplyFunAndArgs, State) end,
+        Batch
+    ),
+    {ok, Pid}.
+
 on_get_status(_InstId, #{health_check_error := true}) ->
 on_get_status(_InstId, #{health_check_error := true}) ->
     disconnected;
     disconnected;
 on_get_status(_InstId, #{pid := Pid}) ->
 on_get_status(_InstId, #{pid := Pid}) ->
@@ -186,7 +226,11 @@ spawn_counter_process(Name, Register) ->
     Pid.
     Pid.
 
 
 counter_loop() ->
 counter_loop() ->
-    counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}).
+    counter_loop(#{
+        counter => 0,
+        status => running,
+        incorrect_status_count => 0
+    }).
 
 
 counter_loop(
 counter_loop(
     #{
     #{
@@ -200,6 +244,12 @@ counter_loop(
             block ->
             block ->
                 ct:pal("counter recv: ~p", [block]),
                 ct:pal("counter recv: ~p", [block]),
                 State#{status => blocked};
                 State#{status => blocked};
+            {block_now, ReplyFun} ->
+                ct:pal("counter recv: ~p", [block_now]),
+                apply_reply(
+                    ReplyFun, {error, {resource_error, #{reason => blocked, msg => blocked}}}
+                ),
+                State#{status => blocked};
             resume ->
             resume ->
                 {messages, Msgs} = erlang:process_info(self(), messages),
                 {messages, Msgs} = erlang:process_info(self(), messages),
                 ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
                 ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),
@@ -209,6 +259,9 @@ counter_loop(
                 apply_reply(ReplyFun, ok),
                 apply_reply(ReplyFun, ok),
                 ?tp(connector_demo_inc_counter_async, #{n => N}),
                 ?tp(connector_demo_inc_counter_async, #{n => N}),
                 State#{counter => Num + N};
                 State#{counter => Num + N};
+            {big_payload, _Payload, ReplyFun} when Status == blocked ->
+                apply_reply(ReplyFun, {error, blocked}),
+                State;
             {{FromPid, ReqRef}, {inc, N}} when Status == running ->
             {{FromPid, ReqRef}, {inc, N}} when Status == running ->
                 %ct:pal("sync counter recv: ~p", [{inc, N}]),
                 %ct:pal("sync counter recv: ~p", [{inc, N}]),
                 FromPid ! {ReqRef, ok},
                 FromPid ! {ReqRef, ok},
@@ -216,6 +269,12 @@ counter_loop(
             {{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
             {{FromPid, ReqRef}, {inc, _N}} when Status == blocked ->
                 FromPid ! {ReqRef, incorrect_status},
                 FromPid ! {ReqRef, incorrect_status},
                 State#{incorrect_status_count := IncorrectCount + 1};
                 State#{incorrect_status_count := IncorrectCount + 1};
+            {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
+                FromPid ! {ReqRef, incorrect_status},
+                State#{incorrect_status_count := IncorrectCount + 1};
+            {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == running ->
+                FromPid ! {ReqRef, ok},
+                State;
             {get, ReplyFun} ->
             {get, ReplyFun} ->
                 apply_reply(ReplyFun, Num),
                 apply_reply(ReplyFun, Num),
                 State;
                 State;

+ 504 - 57
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -411,22 +411,18 @@ t_query_counter_async_inflight(_) ->
 
 
     %% send async query to make the inflight window full
     %% send async query to make the inflight window full
     ?check_trace(
     ?check_trace(
-        ?TRACE_OPTS,
-        inc_counter_in_parallel(WindowSize, ReqOpts),
+        {_, {ok, _}} =
+            ?wait_async_action(
+                inc_counter_in_parallel(WindowSize, ReqOpts),
+                #{?snk_kind := resource_worker_flush_but_inflight_full},
+                1_000
+            ),
         fun(Trace) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
             QueryTrace = ?of_kind(call_query_async, Trace),
             ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
             ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
         end
         end
     ),
     ),
     tap_metrics(?LINE),
     tap_metrics(?LINE),
-
-    %% this will block the resource_worker as the inflight window is full now
-    {ok, {ok, _}} =
-        ?wait_async_action(
-            emqx_resource:query(?ID, {inc_counter, 2}),
-            #{?snk_kind := resource_worker_enter_blocked},
-            1_000
-        ),
     ?assertMatch(0, ets:info(Tab0, size)),
     ?assertMatch(0, ets:info(Tab0, size)),
 
 
     tap_metrics(?LINE),
     tap_metrics(?LINE),
@@ -436,17 +432,16 @@ t_query_counter_async_inflight(_) ->
         ets:insert(Tab, {Ref, Result}),
         ets:insert(Tab, {Ref, Result}),
         ?tp(tmp_query_inserted, #{})
         ?tp(tmp_query_inserted, #{})
     end,
     end,
+    %% since this counts as a failure, it'll be enqueued and retried
+    %% later, when the resource is unblocked.
     {ok, {ok, _}} =
     {ok, {ok, _}} =
         ?wait_async_action(
         ?wait_async_action(
-            emqx_resource:query(?ID, {inc_counter, 3}, #{
+            emqx_resource:query(?ID, {inc_counter, 99}, #{
                 async_reply_fun => {Insert, [Tab0, tmp_query]}
                 async_reply_fun => {Insert, [Tab0, tmp_query]}
             }),
             }),
-            #{?snk_kind := tmp_query_inserted},
+            #{?snk_kind := resource_worker_appended_to_queue},
             1_000
             1_000
         ),
         ),
-    %% since this counts as a failure, it'll be enqueued and retried
-    %% later, when the resource is unblocked.
-    ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
     tap_metrics(?LINE),
     tap_metrics(?LINE),
 
 
     %% all responses should be received after the resource is resumed.
     %% all responses should be received after the resource is resumed.
@@ -455,46 +450,49 @@ t_query_counter_async_inflight(_) ->
         %% +1 because the tmp_query above will be retried and succeed
         %% +1 because the tmp_query above will be retried and succeed
         %% this time.
         %% this time.
         WindowSize + 1,
         WindowSize + 1,
-        _Timeout = 60_000
+        _Timeout0 = 10_000
     ),
     ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     tap_metrics(?LINE),
     tap_metrics(?LINE),
     {ok, _} = snabbkaffe:receive_events(SRef0),
     {ok, _} = snabbkaffe:receive_events(SRef0),
+    tap_metrics(?LINE),
     %% since the previous tmp_query was enqueued to be retried, we
     %% since the previous tmp_query was enqueued to be retried, we
     %% take it again from the table; this time, it should have
     %% take it again from the table; this time, it should have
     %% succeeded.
     %% succeeded.
     ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
     ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
-    ?assertEqual(WindowSize, ets:info(Tab0, size)),
-    tap_metrics(?LINE),
 
 
     %% send async query, this time everything should be ok.
     %% send async query, this time everything should be ok.
     Num = 10,
     Num = 10,
     ?check_trace(
     ?check_trace(
-        ?TRACE_OPTS,
         begin
         begin
             {ok, SRef} = snabbkaffe:subscribe(
             {ok, SRef} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
                 ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
                 Num,
                 Num,
-                _Timeout = 60_000
+                _Timeout0 = 10_000
             ),
             ),
-            inc_counter_in_parallel(Num, ReqOpts),
+            inc_counter_in_parallel_increasing(Num, 1, ReqOpts),
             {ok, _} = snabbkaffe:receive_events(SRef),
             {ok, _} = snabbkaffe:receive_events(SRef),
             ok
             ok
         end,
         end,
         fun(Trace) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
             QueryTrace = ?of_kind(call_query_async, Trace),
-            ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace)
+            ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace),
+            ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+            tap_metrics(?LINE),
+            ok
         end
         end
     ),
     ),
-    ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
-    tap_metrics(?LINE),
 
 
     %% block the resource
     %% block the resource
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
     %% again, send async query to make the inflight window full
     %% again, send async query to make the inflight window full
     ?check_trace(
     ?check_trace(
-        ?TRACE_OPTS,
-        inc_counter_in_parallel(WindowSize, ReqOpts),
+        {_, {ok, _}} =
+            ?wait_async_action(
+                inc_counter_in_parallel(WindowSize, ReqOpts),
+                #{?snk_kind := resource_worker_flush_but_inflight_full},
+                1_000
+            ),
         fun(Trace) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
             QueryTrace = ?of_kind(call_query_async, Trace),
             ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
             ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
@@ -502,17 +500,18 @@ t_query_counter_async_inflight(_) ->
     ),
     ),
 
 
     %% this will block the resource_worker
     %% this will block the resource_worker
-    ok = emqx_resource:query(?ID, {inc_counter, 1}),
+    ok = emqx_resource:query(?ID, {inc_counter, 4}),
 
 
     Sent = WindowSize + Num + WindowSize,
     Sent = WindowSize + Num + WindowSize,
     {ok, SRef1} = snabbkaffe:subscribe(
     {ok, SRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
         WindowSize,
         WindowSize,
-        _Timeout = 60_000
+        _Timeout0 = 10_000
     ),
     ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     {ok, _} = snabbkaffe:receive_events(SRef1),
     {ok, _} = snabbkaffe:receive_events(SRef1),
-    ?assertEqual(Sent, ets:info(Tab0, size)),
+    ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+    tap_metrics(?LINE),
 
 
     {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
     {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
     ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
     ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
@@ -572,7 +571,7 @@ t_query_counter_async_inflight_batch(_) ->
     end,
     end,
     ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
     ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
     BatchSize = 2,
     BatchSize = 2,
-    WindowSize = 3,
+    WindowSize = 15,
     {ok, _} = emqx_resource:create_local(
     {ok, _} = emqx_resource:create_local(
         ?ID,
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?DEFAULT_RESOURCE_GROUP,
@@ -594,16 +593,12 @@ t_query_counter_async_inflight_batch(_) ->
     %% send async query to make the inflight window full
     %% send async query to make the inflight window full
     NumMsgs = BatchSize * WindowSize,
     NumMsgs = BatchSize * WindowSize,
     ?check_trace(
     ?check_trace(
-        begin
-            {ok, SRef} = snabbkaffe:subscribe(
-                ?match_event(#{?snk_kind := call_batch_query_async}),
-                WindowSize,
-                _Timeout = 60_000
+        {_, {ok, _}} =
+            ?wait_async_action(
+                inc_counter_in_parallel(NumMsgs, ReqOpts),
+                #{?snk_kind := resource_worker_flush_but_inflight_full},
+                5_000
             ),
             ),
-            inc_counter_in_parallel(NumMsgs, ReqOpts),
-            {ok, _} = snabbkaffe:receive_events(SRef),
-            ok
-        end,
         fun(Trace) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_batch_query_async, Trace),
             QueryTrace = ?of_kind(call_batch_query_async, Trace),
             ?assertMatch(
             ?assertMatch(
@@ -628,7 +623,7 @@ t_query_counter_async_inflight_batch(_) ->
             {ok, {ok, _}} =
             {ok, {ok, _}} =
                 ?wait_async_action(
                 ?wait_async_action(
                     emqx_resource:query(?ID, {inc_counter, 2}),
                     emqx_resource:query(?ID, {inc_counter, 2}),
-                    #{?snk_kind := resource_worker_enter_blocked},
+                    #{?snk_kind := resource_worker_flush_but_inflight_full},
                     5_000
                     5_000
                 ),
                 ),
             ?assertMatch(0, ets:info(Tab0, size)),
             ?assertMatch(0, ets:info(Tab0, size)),
@@ -644,17 +639,16 @@ t_query_counter_async_inflight_batch(_) ->
         ets:insert(Tab, {Ref, Result}),
         ets:insert(Tab, {Ref, Result}),
         ?tp(tmp_query_inserted, #{})
         ?tp(tmp_query_inserted, #{})
     end,
     end,
+    %% since this counts as a failure, it'll be enqueued and retried
+    %% later, when the resource is unblocked.
     {ok, {ok, _}} =
     {ok, {ok, _}} =
         ?wait_async_action(
         ?wait_async_action(
             emqx_resource:query(?ID, {inc_counter, 3}, #{
             emqx_resource:query(?ID, {inc_counter, 3}, #{
                 async_reply_fun => {Insert, [Tab0, tmp_query]}
                 async_reply_fun => {Insert, [Tab0, tmp_query]}
             }),
             }),
-            #{?snk_kind := tmp_query_inserted},
+            #{?snk_kind := resource_worker_appended_to_queue},
             1_000
             1_000
         ),
         ),
-    %% since this counts as a failure, it'll be enqueued and retried
-    %% later, when the resource is unblocked.
-    ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
     tap_metrics(?LINE),
     tap_metrics(?LINE),
 
 
     %% all responses should be received after the resource is resumed.
     %% all responses should be received after the resource is resumed.
@@ -663,7 +657,7 @@ t_query_counter_async_inflight_batch(_) ->
         %% +1 because the tmp_query above will be retried and succeed
         %% +1 because the tmp_query above will be retried and succeed
         %% this time.
         %% this time.
         WindowSize + 1,
         WindowSize + 1,
-        _Timeout = 60_000
+        10_000
     ),
     ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     tap_metrics(?LINE),
     tap_metrics(?LINE),
@@ -684,7 +678,7 @@ t_query_counter_async_inflight_batch(_) ->
             {ok, SRef} = snabbkaffe:subscribe(
             {ok, SRef} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
                 ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
                 NumBatches1,
                 NumBatches1,
-                _Timeout = 60_000
+                10_000
             ),
             ),
             inc_counter_in_parallel(NumMsgs1, ReqOpts),
             inc_counter_in_parallel(NumMsgs1, ReqOpts),
             {ok, _} = snabbkaffe:receive_events(SRef),
             {ok, _} = snabbkaffe:receive_events(SRef),
@@ -709,8 +703,12 @@ t_query_counter_async_inflight_batch(_) ->
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
     %% again, send async query to make the inflight window full
     %% again, send async query to make the inflight window full
     ?check_trace(
     ?check_trace(
-        ?TRACE_OPTS,
-        inc_counter_in_parallel(WindowSize, ReqOpts),
+        {_, {ok, _}} =
+            ?wait_async_action(
+                inc_counter_in_parallel(NumMsgs, ReqOpts),
+                #{?snk_kind := resource_worker_flush_but_inflight_full},
+                5_000
+            ),
         fun(Trace) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_batch_query_async, Trace),
             QueryTrace = ?of_kind(call_batch_query_async, Trace),
             ?assertMatch(
             ?assertMatch(
@@ -723,15 +721,15 @@ t_query_counter_async_inflight_batch(_) ->
     %% this will block the resource_worker
     %% this will block the resource_worker
     ok = emqx_resource:query(?ID, {inc_counter, 1}),
     ok = emqx_resource:query(?ID, {inc_counter, 1}),
 
 
-    Sent = NumMsgs + NumMsgs1 + WindowSize,
+    Sent = NumMsgs + NumMsgs1 + NumMsgs,
     {ok, SRef1} = snabbkaffe:subscribe(
     {ok, SRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
         WindowSize,
         WindowSize,
-        _Timeout = 60_000
+        10_000
     ),
     ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
     {ok, _} = snabbkaffe:receive_events(SRef1),
     {ok, _} = snabbkaffe:receive_events(SRef1),
-    ?assertEqual(Sent, ets:info(Tab0, size)),
+    ?assertEqual(Sent, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
 
 
     {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
     {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
     ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
     ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
@@ -774,10 +772,8 @@ t_healthy_timeout(_) ->
         %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
         %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
         #{health_check_interval => 200}
         #{health_check_interval => 200}
     ),
     ),
-    ?assertMatch(
-        ?RESOURCE_ERROR(not_connected),
-        emqx_resource:query(?ID, get_state)
-    ),
+    ?assertError(timeout, emqx_resource:query(?ID, get_state, #{timeout => 1_000})),
+    ?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)),
     ok = emqx_resource:remove_local(?ID).
     ok = emqx_resource:remove_local(?ID).
 
 
 t_healthy(_) ->
 t_healthy(_) ->
@@ -842,6 +838,8 @@ t_stop_start(_) ->
     ?assert(is_process_alive(Pid0)),
     ?assert(is_process_alive(Pid0)),
 
 
     %% metrics are reset when recreating
     %% metrics are reset when recreating
+    %% depending on timing, might show the request we just did.
+    ct:sleep(500),
     ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
     ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
 
 
     ok = emqx_resource:stop(?ID),
     ok = emqx_resource:stop(?ID),
@@ -861,6 +859,7 @@ t_stop_start(_) ->
     ?assert(is_process_alive(Pid1)),
     ?assert(is_process_alive(Pid1)),
 
 
     %% now stop while resetting the metrics
     %% now stop while resetting the metrics
+    ct:sleep(500),
     emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
     emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
     emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
     emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
     ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
     ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
@@ -1067,7 +1066,7 @@ t_retry_batch(_Config) ->
             %% batch shall remain enqueued.
             %% batch shall remain enqueued.
             {ok, _} =
             {ok, _} =
                 snabbkaffe:block_until(
                 snabbkaffe:block_until(
-                    ?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}),
+                    ?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}),
                     5_000
                     5_000
                 ),
                 ),
             %% should not have increased the matched count with the retries
             %% should not have increased the matched count with the retries
@@ -1079,7 +1078,7 @@ t_retry_batch(_Config) ->
             {ok, {ok, _}} =
             {ok, {ok, _}} =
                 ?wait_async_action(
                 ?wait_async_action(
                     ok = emqx_resource:simple_sync_query(?ID, resume),
                     ok = emqx_resource:simple_sync_query(?ID, resume),
-                    #{?snk_kind := resource_worker_retry_queue_batch_succeeded},
+                    #{?snk_kind := resource_worker_retry_inflight_succeeded},
                     5_000
                     5_000
                 ),
                 ),
             %% 1 more because of the `resume' call
             %% 1 more because of the `resume' call
@@ -1116,6 +1115,390 @@ t_retry_batch(_Config) ->
     ),
     ),
     ok.
     ok.
 
 
+t_delete_and_re_create_with_same_name(_Config) ->
+    NumBufferWorkers = 2,
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => sync,
+            batch_size => 1,
+            worker_pool_size => NumBufferWorkers,
+            queue_seg_bytes => 100,
+            resume_interval => 1_000
+        }
+    ),
+    %% pre-condition: we should have just created a new queue
+    Queuing0 = emqx_resource_metrics:queuing_get(?ID),
+    Inflight0 = emqx_resource_metrics:inflight_get(?ID),
+    ?assertEqual(0, Queuing0),
+    ?assertEqual(0, Inflight0),
+    ?check_trace(
+        begin
+            ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
+            NumRequests = 10,
+            {ok, SRef} = snabbkaffe:subscribe(
+                ?match_event(#{?snk_kind := resource_worker_enter_blocked}),
+                NumBufferWorkers,
+                _Timeout = 5_000
+            ),
+            %% ensure replayq offloads to disk
+            Payload = binary:copy(<<"a">>, 119),
+            lists:foreach(
+                fun(N) ->
+                    spawn_link(fun() ->
+                        {error, _} =
+                            emqx_resource:query(
+                                ?ID,
+                                {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
+                            )
+                    end)
+                end,
+                lists:seq(1, NumRequests)
+            ),
+
+            {ok, _} = snabbkaffe:receive_events(SRef),
+
+            %% ensure that stuff got enqueued into disk
+            tap_metrics(?LINE),
+            Queuing1 = emqx_resource_metrics:queuing_get(?ID),
+            Inflight1 = emqx_resource_metrics:inflight_get(?ID),
+            ?assert(Queuing1 > 0),
+            ?assertEqual(2, Inflight1),
+
+            %% now, we delete the resource
+            process_flag(trap_exit, true),
+            ok = emqx_resource:remove_local(?ID),
+            ?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
+
+            %% re-create the resource with the *same name*
+            {{ok, _}, {ok, _Events}} =
+                ?wait_async_action(
+                    emqx_resource:create(
+                        ?ID,
+                        ?DEFAULT_RESOURCE_GROUP,
+                        ?TEST_RESOURCE,
+                        #{name => test_resource},
+                        #{
+                            query_mode => async,
+                            batch_size => 1,
+                            worker_pool_size => 2,
+                            queue_seg_bytes => 100,
+                            resume_interval => 1_000
+                        }
+                    ),
+                    #{?snk_kind := resource_worker_enter_running},
+                    5_000
+                ),
+
+            %% it shouldn't have anything enqueued, as it's a fresh resource
+            Queuing2 = emqx_resource_metrics:queuing_get(?ID),
+            Inflight2 = emqx_resource_metrics:queuing_get(?ID),
+            ?assertEqual(0, Queuing2),
+            ?assertEqual(0, Inflight2),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
+%% check that, if we configure a max queue size too small, then we
+%% never send requests and always overflow.
+t_always_overflow(_Config) ->
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => sync,
+            batch_size => 1,
+            worker_pool_size => 1,
+            max_queue_bytes => 1,
+            resume_interval => 1_000
+        }
+    ),
+    ?check_trace(
+        begin
+            Payload = binary:copy(<<"a">>, 100),
+            %% since it's sync and it should never send a request, this
+            %% errors with `timeout'.
+            ?assertError(
+                timeout,
+                emqx_resource:query(
+                    ?ID,
+                    {big_payload, Payload},
+                    #{timeout => 500}
+                )
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertEqual([], ?of_kind(call_query_enter, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_retry_sync_inflight(_Config) ->
+    ResumeInterval = 1_000,
+    emqx_connector_demo:set_callback_mode(always_sync),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => sync,
+            batch_size => 1,
+            worker_pool_size => 1,
+            resume_interval => ResumeInterval
+        }
+    ),
+    QueryOpts = #{},
+    ?check_trace(
+        begin
+            %% now really make the resource go into `blocked' state.
+            %% this results in a retriable error when sync.
+            ok = emqx_resource:simple_sync_query(?ID, block),
+            TestPid = self(),
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    spawn_link(fun() ->
+                        Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+                        TestPid ! {res, Res}
+                    end),
+                    #{?snk_kind := resource_worker_retry_inflight_failed},
+                    ResumeInterval * 2
+                ),
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    ok = emqx_resource:simple_sync_query(?ID, resume),
+                    #{?snk_kind := resource_worker_retry_inflight_succeeded},
+                    ResumeInterval * 3
+                ),
+            receive
+                {res, Res} ->
+                    ?assertEqual(ok, Res)
+            after 5_000 ->
+                ct:fail("no response")
+            end,
+            ok
+        end,
+        [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
+    ),
+    ok.
+
+t_retry_sync_inflight_batch(_Config) ->
+    ResumeInterval = 1_000,
+    emqx_connector_demo:set_callback_mode(always_sync),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => sync,
+            batch_size => 2,
+            batch_time => 200,
+            worker_pool_size => 1,
+            resume_interval => ResumeInterval
+        }
+    ),
+    QueryOpts = #{},
+    ?check_trace(
+        begin
+            %% make the resource go into `blocked' state.  this
+            %% results in a retriable error when sync.
+            ok = emqx_resource:simple_sync_query(?ID, block),
+            process_flag(trap_exit, true),
+            TestPid = self(),
+            {_, {ok, _}} =
+                ?wait_async_action(
+                    spawn_link(fun() ->
+                        Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
+                        TestPid ! {res, Res}
+                    end),
+                    #{?snk_kind := resource_worker_retry_inflight_failed},
+                    ResumeInterval * 2
+                ),
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    ok = emqx_resource:simple_sync_query(?ID, resume),
+                    #{?snk_kind := resource_worker_retry_inflight_succeeded},
+                    ResumeInterval * 3
+                ),
+            receive
+                {res, Res} ->
+                    ?assertEqual(ok, Res)
+            after 5_000 ->
+                ct:fail("no response")
+            end,
+            ok
+        end,
+        [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
+    ),
+    ok.
+
+t_retry_async_inflight(_Config) ->
+    ResumeInterval = 1_000,
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 1,
+            worker_pool_size => 1,
+            resume_interval => ResumeInterval
+        }
+    ),
+    QueryOpts = #{},
+    ?check_trace(
+        begin
+            %% block
+            ok = emqx_resource:simple_sync_query(?ID, block),
+
+            %% then send an async request; that should be retriable.
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
+                    #{?snk_kind := resource_worker_retry_inflight_failed},
+                    ResumeInterval * 2
+                ),
+
+            %% will reply with success after the resource is healed
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_resource:simple_sync_query(?ID, resume),
+                    #{?snk_kind := resource_worker_enter_running},
+                    ResumeInterval * 2
+                ),
+            ok
+        end,
+        [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
+    ),
+    ok.
+
+t_retry_async_inflight_batch(_Config) ->
+    ResumeInterval = 1_000,
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 2,
+            batch_time => 200,
+            worker_pool_size => 1,
+            resume_interval => ResumeInterval
+        }
+    ),
+    QueryOpts = #{},
+    ?check_trace(
+        begin
+            %% block
+            ok = emqx_resource:simple_sync_query(?ID, block),
+
+            %% then send an async request; that should be retriable.
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
+                    #{?snk_kind := resource_worker_retry_inflight_failed},
+                    ResumeInterval * 2
+                ),
+
+            %% will reply with success after the resource is healed
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    emqx_resource:simple_sync_query(?ID, resume),
+                    #{?snk_kind := resource_worker_enter_running},
+                    ResumeInterval * 2
+                ),
+            ok
+        end,
+        [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
+    ),
+    ok.
+
+%% check that we monitor async worker pids and abort their inflight
+%% requests if they die.
+t_async_pool_worker_death(_Config) ->
+    ResumeInterval = 1_000,
+    NumBufferWorkers = 2,
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+    {ok, _} = emqx_resource:create(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{
+            query_mode => async,
+            batch_size => 1,
+            worker_pool_size => NumBufferWorkers,
+            resume_interval => ResumeInterval
+        }
+    ),
+    Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
+    Insert0 = fun(Tab, Ref, Result) ->
+        ct:pal("inserting ~p", [{Ref, Result}]),
+        ets:insert(Tab, {Ref, Result})
+    end,
+    ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
+    ?check_trace(
+        begin
+            ok = emqx_resource:simple_sync_query(?ID, block),
+
+            NumReqs = 10,
+            {ok, SRef0} =
+                snabbkaffe:subscribe(
+                    ?match_event(#{?snk_kind := resource_worker_appended_to_inflight}),
+                    NumReqs,
+                    1_000
+                ),
+            inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts),
+            {ok, _} = snabbkaffe:receive_events(SRef0),
+
+            Inflight0 = emqx_resource_metrics:inflight_get(?ID),
+            ?assertEqual(NumReqs, Inflight0),
+
+            %% grab one of the worker pids and kill it
+            {ok, SRef1} =
+                snabbkaffe:subscribe(
+                    ?match_event(#{?snk_kind := resource_worker_worker_down_update}),
+                    NumBufferWorkers,
+                    10_000
+                ),
+            {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
+            MRef = monitor(process, Pid0),
+            ct:pal("will kill ~p", [Pid0]),
+            exit(Pid0, kill),
+            receive
+                {'DOWN', MRef, process, Pid0, killed} ->
+                    ct:pal("~p killed", [Pid0]),
+                    ok
+            after 200 ->
+                ct:fail("worker should have died")
+            end,
+
+            %% inflight requests should have been marked as retriable
+            {ok, _} = snabbkaffe:receive_events(SRef1),
+            Inflight1 = emqx_resource_metrics:inflight_get(?ID),
+            ?assertEqual(NumReqs, Inflight1),
+
+            ok
+        end,
+        []
+    ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Helpers
 %% Helpers
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -1136,6 +1519,30 @@ inc_counter_in_parallel(N, Opts0) ->
         end)
         end)
      || _ <- lists:seq(1, N)
      || _ <- lists:seq(1, N)
     ],
     ],
+    [
+        receive
+            {complete, Pid} -> ok
+        after 1000 ->
+            ct:fail({wait_for_query_timeout, Pid})
+        end
+     || Pid <- Pids
+    ],
+    ok.
+
+inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
+    Parent = self(),
+    Pids = [
+        erlang:spawn(fun() ->
+            Opts =
+                case is_function(Opts0) of
+                    true -> Opts0();
+                    false -> Opts0
+                end,
+            emqx_resource:query(?ID, {inc_counter, M}, Opts),
+            Parent ! {complete, self()}
+        end)
+     || M <- lists:seq(StartN, StartN + N - 1)
+    ],
     [
     [
         receive
         receive
             {complete, Pid} -> ok
             {complete, Pid} -> ok
@@ -1156,3 +1563,43 @@ tap_metrics(Line) ->
     {ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID),
     {ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID),
     ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
     ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
     #{counters => C, gauges => G}.
     #{counters => C, gauges => G}.
+
+assert_sync_retry_fail_then_succeed_inflight(Trace) ->
+    ct:pal("  ~p", [Trace]),
+    ?assert(
+        ?strict_causality(
+            #{?snk_kind := resource_worker_flush_nack, ref := _Ref},
+            #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+            Trace
+        )
+    ),
+    %% not strict causality because it might retry more than once
+    %% before restoring the resource health.
+    ?assert(
+        ?causality(
+            #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+            #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
+            Trace
+        )
+    ),
+    ok.
+
+assert_async_retry_fail_then_succeed_inflight(Trace) ->
+    ct:pal("  ~p", [Trace]),
+    ?assert(
+        ?strict_causality(
+            #{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref},
+            #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+            Trace
+        )
+    ),
+    %% not strict causality because it might retry more than once
+    %% before restoring the resource health.
+    ?assert(
+        ?causality(
+            #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
+            #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
+            Trace
+        )
+    ),
+    ok.

+ 0 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -92,7 +92,6 @@ values(common, Protocol, SupportUint, TypeOpts) ->
                 "bool=${payload.bool}">>,
                 "bool=${payload.bool}">>,
         precision => ms,
         precision => ms,
         resource_opts => #{
         resource_opts => #{
-            enable_batch => false,
             batch_size => 100,
             batch_size => 100,
             batch_time => <<"20ms">>
             batch_time => <<"20ms">>
         },
         },

+ 22 - 26
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl

@@ -108,6 +108,7 @@ end_per_group(_Group, _Config) ->
 init_per_testcase(TestCase, Config0) when
 init_per_testcase(TestCase, Config0) when
     TestCase =:= t_publish_success_batch
     TestCase =:= t_publish_success_batch
 ->
 ->
+    ct:timetrap({seconds, 30}),
     case ?config(batch_size, Config0) of
     case ?config(batch_size, Config0) of
         1 ->
         1 ->
             [{skip_due_to_no_batching, true}];
             [{skip_due_to_no_batching, true}];
@@ -120,6 +121,7 @@ init_per_testcase(TestCase, Config0) when
             [{telemetry_table, Tid} | Config]
             [{telemetry_table, Tid} | Config]
     end;
     end;
 init_per_testcase(TestCase, Config0) ->
 init_per_testcase(TestCase, Config0) ->
+    ct:timetrap({seconds, 30}),
     {ok, _} = start_echo_http_server(),
     {ok, _} = start_echo_http_server(),
     delete_all_bridges(),
     delete_all_bridges(),
     Tid = install_telemetry_handler(TestCase),
     Tid = install_telemetry_handler(TestCase),
@@ -287,6 +289,7 @@ gcp_pubsub_config(Config) ->
             "  pool_size = 1\n"
             "  pool_size = 1\n"
             "  pipelining = ~b\n"
             "  pipelining = ~b\n"
             "  resource_opts = {\n"
             "  resource_opts = {\n"
+            "    request_timeout = 500ms\n"
             "    worker_pool_size = 1\n"
             "    worker_pool_size = 1\n"
             "    query_mode = ~s\n"
             "    query_mode = ~s\n"
             "    batch_size = ~b\n"
             "    batch_size = ~b\n"
@@ -512,14 +515,16 @@ install_telemetry_handler(TestCase) ->
 
 
 wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
 wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
     Events = receive_all_events(GaugeName, Timeout),
     Events = receive_all_events(GaugeName, Timeout),
-    case lists:last(Events) of
+    case length(Events) > 0 andalso lists:last(Events) of
         #{measurements := #{gauge_set := ExpectedValue}} ->
         #{measurements := #{gauge_set := ExpectedValue}} ->
             ok;
             ok;
         #{measurements := #{gauge_set := Value}} ->
         #{measurements := #{gauge_set := Value}} ->
             ct:fail(
             ct:fail(
                 "gauge ~p didn't reach expected value ~p; last value: ~p",
                 "gauge ~p didn't reach expected value ~p; last value: ~p",
                 [GaugeName, ExpectedValue, Value]
                 [GaugeName, ExpectedValue, Value]
-            )
+            );
+        false ->
+            ct:pal("no ~p gauge events received!", [GaugeName])
     end.
     end.
 
 
 receive_all_events(EventName, Timeout) ->
 receive_all_events(EventName, Timeout) ->
@@ -609,6 +614,8 @@ t_publish_success(Config) ->
         ResourceId,
         ResourceId,
         #{n_events => ExpectedInflightEvents, timeout => 5_000}
         #{n_events => ExpectedInflightEvents, timeout => 5_000}
     ),
     ),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
     assert_metrics(
         #{
         #{
             dropped => 0,
             dropped => 0,
@@ -657,6 +664,8 @@ t_publish_success_local_topic(Config) ->
         ResourceId,
         ResourceId,
         #{n_events => ExpectedInflightEvents, timeout => 5_000}
         #{n_events => ExpectedInflightEvents, timeout => 5_000}
     ),
     ),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
     assert_metrics(
         #{
         #{
             dropped => 0,
             dropped => 0,
@@ -743,6 +752,8 @@ t_publish_templated(Config) ->
         ResourceId,
         ResourceId,
         #{n_events => ExpectedInflightEvents, timeout => 5_000}
         #{n_events => ExpectedInflightEvents, timeout => 5_000}
     ),
     ),
+    wait_until_gauge_is(queuing, 0, 500),
+    wait_until_gauge_is(inflight, 0, 500),
     assert_metrics(
     assert_metrics(
         #{
         #{
             dropped => 0,
             dropped => 0,
@@ -1124,19 +1135,17 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                 ResourceId
                 ResourceId
             );
             );
         {_, sync} ->
         {_, sync} ->
-            wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
-                timeout => 10_000, n_events => 2
-            }),
             %% even waiting, hard to avoid flakiness... simpler to just sleep
             %% even waiting, hard to avoid flakiness... simpler to just sleep
             %% a bit until stabilization.
             %% a bit until stabilization.
-            ct:sleep(200),
+            wait_until_gauge_is(queuing, 0, 500),
+            wait_until_gauge_is(inflight, 1, 500),
             assert_metrics(
             assert_metrics(
                 #{
                 #{
                     dropped => 0,
                     dropped => 0,
                     failed => 0,
                     failed => 0,
-                    inflight => 0,
+                    inflight => 1,
                     matched => 1,
                     matched => 1,
-                    queuing => 1,
+                    queuing => 0,
                     retried => 0,
                     retried => 0,
                     success => 0
                     success => 0
                 },
                 },
@@ -1264,7 +1273,6 @@ t_failure_no_body(Config) ->
 
 
 t_unrecoverable_error(Config) ->
 t_unrecoverable_error(Config) ->
     ResourceId = ?config(resource_id, Config),
     ResourceId = ?config(resource_id, Config),
-    TelemetryTable = ?config(telemetry_table, Config),
     QueryMode = ?config(query_mode, Config),
     QueryMode = ?config(query_mode, Config),
     TestPid = self(),
     TestPid = self(),
     FailureNoBodyHandler =
     FailureNoBodyHandler =
@@ -1326,26 +1334,14 @@ t_unrecoverable_error(Config) ->
             ok
             ok
         end
         end
     ),
     ),
-    wait_telemetry_event(TelemetryTable, failed, ResourceId),
-    ExpectedInflightEvents =
-        case QueryMode of
-            sync -> 1;
-            async -> 3
-        end,
-    wait_telemetry_event(
-        TelemetryTable,
-        inflight,
-        ResourceId,
-        #{n_events => ExpectedInflightEvents, timeout => 5_000}
-    ),
-    %% even waiting, hard to avoid flakiness... simpler to just sleep
-    %% a bit until stabilization.
-    ct:sleep(200),
+
+    wait_until_gauge_is(queuing, 0, _Timeout = 400),
+    wait_until_gauge_is(inflight, 1, _Timeout = 400),
     assert_metrics(
     assert_metrics(
         #{
         #{
             dropped => 0,
             dropped => 0,
-            failed => 1,
-            inflight => 0,
+            failed => 0,
+            inflight => 1,
             matched => 1,
             matched => 1,
             queuing => 0,
             queuing => 0,
             retried => 0,
             retried => 0,

+ 26 - 12
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl

@@ -778,15 +778,25 @@ t_bad_timestamp(Config) ->
                 {async, false} ->
                 {async, false} ->
                     ?assertEqual(ok, Return),
                     ?assertEqual(ok, Return),
                     ?assertMatch(
                     ?assertMatch(
-                        [#{error := [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}],
+                        [
+                            #{
+                                error := [
+                                    {error, {bad_timestamp, [<<"bad_timestamp">>]}}
+                                ]
+                            }
+                        ],
                         ?of_kind(influxdb_connector_send_query_error, Trace)
                         ?of_kind(influxdb_connector_send_query_error, Trace)
                     );
                     );
                 {sync, false} ->
                 {sync, false} ->
                     ?assertEqual(
                     ?assertEqual(
-                        {error, [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}, Return
+                        {error,
+                            {unrecoverable_error, [
+                                {error, {bad_timestamp, [<<"bad_timestamp">>]}}
+                            ]}},
+                        Return
                     );
                     );
                 {sync, true} ->
                 {sync, true} ->
-                    ?assertEqual({error, points_trans_failed}, Return)
+                    ?assertEqual({error, {unrecoverable_error, points_trans_failed}}, Return)
             end,
             end,
             ok
             ok
         end
         end
@@ -894,11 +904,19 @@ t_write_failure(Config) ->
     },
     },
     ?check_trace(
     ?check_trace(
         emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
-            send_message(Config, SentData)
+            case QueryMode of
+                sync ->
+                    ?assertError(timeout, send_message(Config, SentData));
+                async ->
+                    ?assertEqual(ok, send_message(Config, SentData))
+            end
         end),
         end),
-        fun(Result, _Trace) ->
+        fun(Trace0) ->
             case QueryMode of
             case QueryMode of
                 sync ->
                 sync ->
+                    Trace = ?of_kind(resource_worker_flush_nack, Trace0),
+                    ?assertMatch([_ | _], Trace),
+                    [#{result := Result} | _] = Trace,
                     ?assert(
                     ?assert(
                         {error, {error, {closed, "The connection was lost."}}} =:= Result orelse
                         {error, {error, {closed, "The connection was lost."}}} =:= Result orelse
                             {error, {error, closed}} =:= Result orelse
                             {error, {error, closed}} =:= Result orelse
@@ -906,7 +924,7 @@ t_write_failure(Config) ->
                         #{got => Result}
                         #{got => Result}
                     );
                     );
                 async ->
                 async ->
-                    ?assertEqual(ok, Result)
+                    ok
             end,
             end,
             ok
             ok
         end
         end
@@ -938,11 +956,7 @@ t_missing_field(Config) ->
         begin
         begin
             emqx:publish(Msg0),
             emqx:publish(Msg0),
             emqx:publish(Msg1),
             emqx:publish(Msg1),
-            NEvents =
-                case IsBatch of
-                    true -> 1;
-                    false -> 2
-                end,
+            NEvents = 1,
             {ok, _} =
             {ok, _} =
                 snabbkaffe:block_until(
                 snabbkaffe:block_until(
                     ?match_n_events(NEvents, #{
                     ?match_n_events(NEvents, #{
@@ -964,7 +978,7 @@ t_missing_field(Config) ->
                     );
                     );
                 false ->
                 false ->
                     ?assertMatch(
                     ?assertMatch(
-                        [#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _],
+                        [#{error := [{error, no_fields}]} | _],
                         ?of_kind(influxdb_connector_send_query_error, Trace)
                         ?of_kind(influxdb_connector_send_query_error, Trace)
                     )
                     )
             end,
             end,

+ 40 - 19
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -module(emqx_ee_bridge_mysql_SUITE).
 -module(emqx_ee_bridge_mysql_SUITE).
@@ -170,6 +170,7 @@ mysql_config(BridgeType, Config) ->
             "  password = ~p\n"
             "  password = ~p\n"
             "  sql = ~p\n"
             "  sql = ~p\n"
             "  resource_opts = {\n"
             "  resource_opts = {\n"
+            "    request_timeout = 500ms\n"
             "    batch_size = ~b\n"
             "    batch_size = ~b\n"
             "    query_mode = ~s\n"
             "    query_mode = ~s\n"
             "  }\n"
             "  }\n"
@@ -397,20 +398,32 @@ t_write_failure(Config) ->
     ProxyName = ?config(proxy_name, Config),
     ProxyName = ?config(proxy_name, Config),
     ProxyPort = ?config(proxy_port, Config),
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyHost = ?config(proxy_host, Config),
+    QueryMode = ?config(query_mode, Config),
     {ok, _} = create_bridge(Config),
     {ok, _} = create_bridge(Config),
     Val = integer_to_binary(erlang:unique_integer()),
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
     SentData = #{payload => Val, timestamp => 1668602148000},
     ?check_trace(
     ?check_trace(
         emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
-            send_message(Config, SentData)
+            case QueryMode of
+                sync ->
+                    ?assertError(timeout, send_message(Config, SentData));
+                async ->
+                    send_message(Config, SentData)
+            end
         end),
         end),
-        fun
-            ({error, {resource_error, _}}, _Trace) ->
-                ok;
-            ({error, {recoverable_error, disconnected}}, _Trace) ->
-                ok;
-            (_, _Trace) ->
-                ?assert(false)
+        fun(Trace0) ->
+            ct:pal("trace: ~p", [Trace0]),
+            Trace = ?of_kind(resource_worker_flush_nack, Trace0),
+            ?assertMatch([#{result := {error, _}} | _], Trace),
+            [#{result := {error, Error}} | _] = Trace,
+            case Error of
+                {resource_error, _} ->
+                    ok;
+                {recoverable_error, disconnected} ->
+                    ok;
+                _ ->
+                    ct:fail("unexpected error: ~p", [Error])
+            end
         end
         end
     ),
     ),
     ok.
     ok.
@@ -424,10 +437,10 @@ t_write_timeout(Config) ->
     {ok, _} = create_bridge(Config),
     {ok, _} = create_bridge(Config),
     Val = integer_to_binary(erlang:unique_integer()),
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
     SentData = #{payload => Val, timestamp => 1668602148000},
-    Timeout = 10,
+    Timeout = 1000,
     emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
     emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
-        ?assertMatch(
-            {error, {resource_error, _}},
+        ?assertError(
+            timeout,
             query_resource(Config, {send_message, SentData, [], Timeout})
             query_resource(Config, {send_message, SentData, [], Timeout})
         )
         )
     end),
     end),
@@ -443,7 +456,7 @@ t_simple_sql_query(Config) ->
     BatchSize = ?config(batch_size, Config),
     BatchSize = ?config(batch_size, Config),
     IsBatch = BatchSize > 1,
     IsBatch = BatchSize > 1,
     case IsBatch of
     case IsBatch of
-        true -> ?assertEqual({error, batch_select_not_implemented}, Result);
+        true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result);
         false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
         false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
     end,
     end,
     ok.
     ok.
@@ -459,10 +472,16 @@ t_missing_data(Config) ->
     case IsBatch of
     case IsBatch of
         true ->
         true ->
             ?assertMatch(
             ?assertMatch(
-                {error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result
+                {error,
+                    {unrecoverable_error,
+                        {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}},
+                Result
             );
             );
         false ->
         false ->
-            ?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result)
+            ?assertMatch(
+                {error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}},
+                Result
+            )
     end,
     end,
     ok.
     ok.
 
 
@@ -476,8 +495,10 @@ t_bad_sql_parameter(Config) ->
     BatchSize = ?config(batch_size, Config),
     BatchSize = ?config(batch_size, Config),
     IsBatch = BatchSize > 1,
     IsBatch = BatchSize > 1,
     case IsBatch of
     case IsBatch of
-        true -> ?assertEqual({error, invalid_request}, Result);
-        false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result)
+        true ->
+            ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
+        false ->
+            ?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result)
     end,
     end,
     ok.
     ok.
 
 
@@ -491,8 +512,8 @@ t_unprepared_statement_query(Config) ->
     BatchSize = ?config(batch_size, Config),
     BatchSize = ?config(batch_size, Config),
     IsBatch = BatchSize > 1,
     IsBatch = BatchSize > 1,
     case IsBatch of
     case IsBatch of
-        true -> ?assertEqual({error, invalid_request}, Result);
-        false -> ?assertEqual({error, prepared_statement_invalid}, Result)
+        true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
+        false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result)
     end,
     end,
     ok.
     ok.
 
 

+ 28 - 17
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl

@@ -191,6 +191,7 @@ pgsql_config(BridgeType, Config) ->
             "  password = ~p\n"
             "  password = ~p\n"
             "  sql = ~p\n"
             "  sql = ~p\n"
             "  resource_opts = {\n"
             "  resource_opts = {\n"
+            "    request_timeout = 500ms\n"
             "    batch_size = ~b\n"
             "    batch_size = ~b\n"
             "    query_mode = ~s\n"
             "    query_mode = ~s\n"
             "  }\n"
             "  }\n"
@@ -415,20 +416,32 @@ t_write_failure(Config) ->
     ProxyName = ?config(proxy_name, Config),
     ProxyName = ?config(proxy_name, Config),
     ProxyPort = ?config(proxy_port, Config),
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyHost = ?config(proxy_host, Config),
+    QueryMode = ?config(query_mode, Config),
     {ok, _} = create_bridge(Config),
     {ok, _} = create_bridge(Config),
     Val = integer_to_binary(erlang:unique_integer()),
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
     SentData = #{payload => Val, timestamp => 1668602148000},
     ?check_trace(
     ?check_trace(
         emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
-            send_message(Config, SentData)
+            case QueryMode of
+                sync ->
+                    ?assertError(timeout, send_message(Config, SentData));
+                async ->
+                    send_message(Config, SentData)
+            end
         end),
         end),
-        fun
-            ({error, {resource_error, _}}, _Trace) ->
-                ok;
-            ({error, {recoverable_error, disconnected}}, _Trace) ->
-                ok;
-            (_, _Trace) ->
-                ?assert(false)
+        fun(Trace0) ->
+            ct:pal("trace: ~p", [Trace0]),
+            Trace = ?of_kind(resource_worker_flush_nack, Trace0),
+            ?assertMatch([#{result := {error, _}} | _], Trace),
+            [#{result := {error, Error}} | _] = Trace,
+            case Error of
+                {resource_error, _} ->
+                    ok;
+                disconnected ->
+                    ok;
+                _ ->
+                    ct:fail("unexpected error: ~p", [Error])
+            end
         end
         end
     ),
     ),
     ok.
     ok.
@@ -442,12 +455,9 @@ t_write_timeout(Config) ->
     {ok, _} = create_bridge(Config),
     {ok, _} = create_bridge(Config),
     Val = integer_to_binary(erlang:unique_integer()),
     Val = integer_to_binary(erlang:unique_integer()),
     SentData = #{payload => Val, timestamp => 1668602148000},
     SentData = #{payload => Val, timestamp => 1668602148000},
-    Timeout = 10,
+    Timeout = 1000,
     emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
     emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
-        ?assertMatch(
-            {error, {resource_error, _}},
-            query_resource(Config, {send_message, SentData, [], Timeout})
-        )
+        ?assertError(timeout, query_resource(Config, {send_message, SentData, [], Timeout}))
     end),
     end),
     ok.
     ok.
 
 
@@ -459,7 +469,7 @@ t_simple_sql_query(Config) ->
     Request = {sql, <<"SELECT count(1) AS T">>},
     Request = {sql, <<"SELECT count(1) AS T">>},
     Result = query_resource(Config, Request),
     Result = query_resource(Config, Request),
     case ?config(enable_batch, Config) of
     case ?config(enable_batch, Config) of
-        true -> ?assertEqual({error, batch_prepare_not_implemented}, Result);
+        true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
         false -> ?assertMatch({ok, _, [{1}]}, Result)
         false -> ?assertMatch({ok, _, [{1}]}, Result)
     end,
     end,
     ok.
     ok.
@@ -471,7 +481,8 @@ t_missing_data(Config) ->
     ),
     ),
     Result = send_message(Config, #{}),
     Result = send_message(Config, #{}),
     ?assertMatch(
     ?assertMatch(
-        {error, {error, error, <<"23502">>, not_null_violation, _, _}}, Result
+        {error, {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}},
+        Result
     ),
     ),
     ok.
     ok.
 
 
@@ -484,10 +495,10 @@ t_bad_sql_parameter(Config) ->
     Result = query_resource(Config, Request),
     Result = query_resource(Config, Request),
     case ?config(enable_batch, Config) of
     case ?config(enable_batch, Config) of
         true ->
         true ->
-            ?assertEqual({error, invalid_request}, Result);
+            ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
         false ->
         false ->
             ?assertMatch(
             ?assertMatch(
-                {error, {resource_error, _}}, Result
+                {error, {unrecoverable_error, _}}, Result
             )
             )
     end,
     end,
     ok.
     ok.

+ 1 - 1
lib-ee/emqx_ee_connector/rebar.config

@@ -1,7 +1,7 @@
 {erl_opts, [debug_info]}.
 {erl_opts, [debug_info]}.
 {deps, [
 {deps, [
   {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
   {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
-  {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.6"}}},
+  {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.7"}}},
   {emqx, {path, "../../apps/emqx"}}
   {emqx, {path, "../../apps/emqx"}}
 ]}.
 ]}.
 
 

+ 5 - 4
lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl

@@ -178,7 +178,7 @@ on_query(BridgeId, {send_message, Selected}, State) ->
     {send_message, map()},
     {send_message, map()},
     {ReplyFun :: function(), Args :: list()},
     {ReplyFun :: function(), Args :: list()},
     state()
     state()
-) -> ok.
+) -> {ok, pid()}.
 on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) ->
 on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) ->
     Requests = [{send_message, Selected}],
     Requests = [{send_message, Selected}],
     ?TRACE(
     ?TRACE(
@@ -210,7 +210,7 @@ on_batch_query(BridgeId, Requests, State) ->
     [{send_message, map()}],
     [{send_message, map()}],
     {ReplyFun :: function(), Args :: list()},
     {ReplyFun :: function(), Args :: list()},
     state()
     state()
-) -> ok.
+) -> {ok, pid()}.
 on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) ->
 on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) ->
     ?TRACE(
     ?TRACE(
         "QUERY_ASYNC",
         "QUERY_ASYNC",
@@ -496,7 +496,7 @@ do_send_requests_sync(State, Requests, ResourceId) ->
     [{send_message, map()}],
     [{send_message, map()}],
     {ReplyFun :: function(), Args :: list()},
     {ReplyFun :: function(), Args :: list()},
     resource_id()
     resource_id()
-) -> ok.
+) -> {ok, pid()}.
 do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
 do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
     #{
     #{
         pool_name := PoolName,
         pool_name := PoolName,
@@ -531,7 +531,8 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
         Request,
         Request,
         RequestTimeout,
         RequestTimeout,
         {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
         {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
-    ).
+    ),
+    {ok, Worker}.
 
 
 -spec reply_delegator(
 -spec reply_delegator(
     resource_id(),
     resource_id(),

+ 5 - 5
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -56,13 +56,13 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
                 #{points => Points, batch => false, mode => sync}
                 #{points => Points, batch => false, mode => sync}
             ),
             ),
             do_query(InstId, Client, Points);
             do_query(InstId, Client, Points);
-        {error, ErrorPoints} = Err ->
+        {error, ErrorPoints} ->
             ?tp(
             ?tp(
                 influxdb_connector_send_query_error,
                 influxdb_connector_send_query_error,
                 #{batch => false, mode => sync, error => ErrorPoints}
                 #{batch => false, mode => sync, error => ErrorPoints}
             ),
             ),
             log_error_points(InstId, ErrorPoints),
             log_error_points(InstId, ErrorPoints),
-            Err
+            {error, {unrecoverable_error, ErrorPoints}}
     end.
     end.
 
 
 %% Once a Batched Data trans to points failed.
 %% Once a Batched Data trans to points failed.
@@ -80,7 +80,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
                 influxdb_connector_send_query_error,
                 influxdb_connector_send_query_error,
                 #{batch => true, mode => sync, error => Reason}
                 #{batch => true, mode => sync, error => Reason}
             ),
             ),
-            {error, Reason}
+            {error, {unrecoverable_error, Reason}}
     end.
     end.
 
 
 on_query_async(
 on_query_async(
@@ -123,7 +123,7 @@ on_batch_query_async(
                 influxdb_connector_send_query_error,
                 influxdb_connector_send_query_error,
                 #{batch => true, mode => async, error => Reason}
                 #{batch => true, mode => async, error => Reason}
             ),
             ),
-            {error, Reason}
+            {error, {unrecoverable_error, Reason}}
     end.
     end.
 
 
 on_get_status(_InstId, #{client := Client}) ->
 on_get_status(_InstId, #{client := Client}) ->
@@ -356,7 +356,7 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
         connector => InstId,
         connector => InstId,
         points => Points
         points => Points
     }),
     }),
-    ok = influxdb:write_async(Client, Points, ReplyFunAndArgs).
+    {ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs).
 
 
 %% -------------------------------------------------------------------------------------------------
 %% -------------------------------------------------------------------------------------------------
 %% Tags & Fields Config Trans
 %% Tags & Fields Config Trans

+ 2 - 2
mix.exs

@@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
-      {:replayq, github: "emqx/replayq", tag: "0.3.5", override: true},
+      {:replayq, github: "emqx/replayq", tag: "0.3.6", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       {:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.2", override: true},
       {:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.2", override: true},
       {:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
       {:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
@@ -131,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do
   defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
   defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
     [
     [
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
       {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
-      {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.6", override: true},
+      {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.7", override: true},
       {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"},
       {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},

+ 1 - 1
rebar.config

@@ -60,7 +60,7 @@
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
-    , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.5"}}}
+    , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.6"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.2"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.2"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}