Explorar el Código

Merge pull request #8596 from terry-xiaoyu/resource_msg_queue

Message Queuing and Batching in emqx-resource
Xinyu Liu hace 3 años
padre
commit
82a54b17fa
Se han modificado 35 ficheros con 1253 adiciones y 414 borrados
  1. 1 1
      apps/emqx/src/emqx.app.src
  2. 1 1
      apps/emqx_authn/src/emqx_authn.app.src
  3. 8 7
      apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl
  4. 2 2
      apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl
  5. 1 1
      apps/emqx_authz/src/emqx_authz.app.src
  6. 2 2
      apps/emqx_authz/src/emqx_authz_mongodb.erl
  7. 2 0
      apps/emqx_authz/test/emqx_authz_api_cache_SUITE.erl
  8. 66 50
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  9. 1 1
      apps/emqx_connector/src/emqx_connector.app.src
  10. 15 17
      apps/emqx_connector/src/emqx_connector_http.erl
  11. 7 5
      apps/emqx_connector/src/emqx_connector_ldap.erl
  12. 6 7
      apps/emqx_connector/src/emqx_connector_mongo.erl
  13. 11 8
      apps/emqx_connector/src/emqx_connector_mqtt.erl
  14. 9 11
      apps/emqx_connector/src/emqx_connector_mysql.erl
  15. 9 7
      apps/emqx_connector/src/emqx_connector_pgsql.erl
  16. 7 5
      apps/emqx_connector/src/emqx_connector_redis.erl
  17. 5 5
      apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl
  18. 1 1
      apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl
  19. 1 1
      apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
  20. 1 1
      apps/emqx_connector/test/emqx_connector_redis_SUITE.erl
  21. 1 1
      apps/emqx_resource/README.md
  22. 15 8
      apps/emqx_resource/include/emqx_resource.hrl
  23. 2 3
      apps/emqx_resource/include/emqx_resource_utils.hrl
  24. 1 1
      apps/emqx_resource/src/emqx_resource.app.src
  25. 42 59
      apps/emqx_resource/src/emqx_resource.erl
  26. 46 25
      apps/emqx_resource/src/emqx_resource_manager.erl
  27. 11 6
      apps/emqx_resource/src/emqx_resource_sup.erl
  28. 17 0
      apps/emqx_resource/src/emqx_resource_utils.erl
  29. 449 0
      apps/emqx_resource/src/emqx_resource_worker.erl
  30. 136 0
      apps/emqx_resource/src/emqx_resource_worker_sup.erl
  31. 162 0
      apps/emqx_resource/test/emqx_connector_demo.erl
  32. 192 45
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  33. 0 110
      apps/emqx_resource/test/emqx_test_resource.erl
  34. 14 15
      lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl
  35. 9 8
      lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -3,7 +3,7 @@
     {id, "emqx"},
     {description, "EMQX Core"},
     % strict semver, bump manually!
-    {vsn, "5.0.4"},
+    {vsn, "5.0.5"},
     {modules, []},
     {registered, []},
     {applications, [

+ 1 - 1
apps/emqx_authn/src/emqx_authn.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_authn, [
     {description, "EMQX Authentication"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {modules, []},
     {registered, [emqx_authn_sup, emqx_authn_registry]},
     {applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},

+ 8 - 7
apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl

@@ -22,15 +22,18 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2,
     connect/1
 ]).
 
 -define(DEFAULT_POOL_SIZE, 8).
 
+callback_mode() -> always_sync.
+
 on_start(InstId, Opts) ->
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
     PoolOpts = [
@@ -45,7 +48,7 @@ on_start(InstId, Opts) ->
 on_stop(_InstId, #{pool_name := PoolName}) ->
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) ->
+on_query(InstId, get_jwks, #{pool_name := PoolName}) ->
     Result = ecpool:pick_and_do(PoolName, {emqx_authn_jwks_client, get_jwks, []}, no_handover),
     case Result of
         {error, Reason} ->
@@ -54,20 +57,18 @@ on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) ->
                 connector => InstId,
                 command => get_jwks,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         _ ->
-            emqx_resource:query_success(AfterQuery)
+            ok
     end,
     Result;
-on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
+on_query(_InstId, {update, Opts}, #{pool_name := PoolName}) ->
     lists:foreach(
         fun({_, Worker}) ->
             ok = ecpool_worker:exec(Worker, {emqx_authn_jwks_client, update, [Opts]}, infinity)
         end,
         ecpool:workers(PoolName)
     ),
-    emqx_resource:query_success(AfterQuery),
     ok.
 
 on_get_status(_InstId, #{pool_name := PoolName}) ->

+ 2 - 2
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -164,7 +164,7 @@ authenticate(
 ) ->
     Filter = emqx_authn_utils:render_deep(FilterTemplate, Credential),
     case emqx_resource:query(ResourceId, {find_one, Collection, Filter, #{}}) of
-        undefined ->
+        {ok, undefined} ->
             ignore;
         {error, Reason} ->
             ?TRACE_AUTHN_PROVIDER(error, "mongodb_query_failed", #{
@@ -174,7 +174,7 @@ authenticate(
                 reason => Reason
             }),
             ignore;
-        Doc ->
+        {ok, Doc} ->
             case check_password(Password, Doc, State) of
                 ok ->
                     {ok, is_superuser(Doc, State)};

+ 1 - 1
apps/emqx_authz/src/emqx_authz.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_authz, [
     {description, "An OTP application"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {mod, {emqx_authz_app, []}},
     {applications, [

+ 2 - 2
apps/emqx_authz/src/emqx_authz_mongodb.erl

@@ -92,9 +92,9 @@ authorize(
                 resource_id => ResourceID
             }),
             nomatch;
-        [] ->
+        {ok, []} ->
             nomatch;
-        Rows ->
+        {ok, Rows} ->
             Rules = [
                 emqx_authz_rule:compile({Permission, all, Action, Topics})
              || #{

+ 2 - 0
apps/emqx_authz/test/emqx_authz_api_cache_SUITE.erl

@@ -23,6 +23,8 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
+suite() -> [{timetrap, {seconds, 60}}].
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 

+ 66 - 50
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -24,7 +24,7 @@
 -include_lib("common_test/include/ct.hrl").
 -define(CONF_DEFAULT, <<"bridges: {}">>).
 -define(BRIDGE_TYPE, <<"webhook">>).
--define(BRIDGE_NAME, <<"test_bridge">>).
+-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
 -define(URL(PORT, PATH),
     list_to_binary(
         io_lib:format(
@@ -78,8 +78,12 @@ set_special_configs(_) ->
 
 init_per_testcase(_, Config) ->
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    Config.
-end_per_testcase(_, _Config) ->
+    {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
+    [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
+end_per_testcase(_, Config) ->
+    Sock = ?config(sock, Config),
+    Acceptor = ?config(acceptor, Config),
+    stop_http_server(Sock, Acceptor),
     clear_resources(),
     ok.
 
@@ -95,31 +99,39 @@ clear_resources() ->
 %% HTTP server for testing
 %%------------------------------------------------------------------------------
 start_http_server(HandleFun) ->
+    process_flag(trap_exit, true),
     Parent = self(),
-    spawn_link(fun() ->
-        {Port, Sock} = listen_on_random_port(),
-        Parent ! {port, Port},
-        loop(Sock, HandleFun, Parent)
+    {Port, Sock} = listen_on_random_port(),
+    Acceptor = spawn_link(fun() ->
+        accept_loop(Sock, HandleFun, Parent)
     end),
-    receive
-        {port, Port} -> Port
-    after 2000 -> error({timeout, start_http_server})
-    end.
+    timer:sleep(100),
+    {Port, Sock, Acceptor}.
+
+stop_http_server(Sock, Acceptor) ->
+    exit(Acceptor, kill),
+    gen_tcp:close(Sock).
 
 listen_on_random_port() ->
     Min = 1024,
     Max = 65000,
+    rand:seed(exsplus, erlang:timestamp()),
     Port = rand:uniform(Max - Min) + Min,
-    case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}, binary]) of
+    case
+        gen_tcp:listen(Port, [
+            binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}
+        ])
+    of
         {ok, Sock} -> {Port, Sock};
         {error, eaddrinuse} -> listen_on_random_port()
     end.
 
-loop(Sock, HandleFun, Parent) ->
+accept_loop(Sock, HandleFun, Parent) ->
+    process_flag(trap_exit, true),
     {ok, Conn} = gen_tcp:accept(Sock),
-    Handler = spawn(fun() -> HandleFun(Conn, Parent) end),
+    Handler = spawn_link(fun() -> HandleFun(Conn, Parent) end),
     gen_tcp:controlling_process(Conn, Handler),
-    loop(Sock, HandleFun, Parent).
+    accept_loop(Sock, HandleFun, Parent).
 
 make_response(CodeStr, Str) ->
     B = iolist_to_binary(Str),
@@ -138,7 +150,9 @@ handle_fun_200_ok(Conn, Parent) ->
             Parent ! {http_server, received, Req},
             gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
             handle_fun_200_ok(Conn, Parent);
-        {error, closed} ->
+        {error, Reason} ->
+            ct:pal("the http handler recv error: ~p", [Reason]),
+            timer:sleep(100),
             gen_tcp:close(Conn)
     end.
 
@@ -153,24 +167,25 @@ parse_http_request(ReqStr0) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_http_crud_apis(_) ->
-    Port = start_http_server(fun handle_fun_200_ok/2),
+t_http_crud_apis(Config) ->
+    Port = ?config(port, Config),
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
     %% then we add a webhook bridge, using POST
     %% POST /bridges/ will create a bridge
     URL1 = ?URL(Port, "path1"),
+    Name = ?BRIDGE_NAME,
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
 
     %ct:pal("---bridge: ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := _,
         <<"node_status">> := [_ | _],
@@ -179,7 +194,7 @@ t_http_crud_apis(_) ->
         <<"url">> := URL1
     } = jsx:decode(Bridge),
 
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% send an message to emqx and the message should be forwarded to the HTTP server
     Body = <<"my msg">>,
     emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
@@ -203,12 +218,12 @@ t_http_crud_apis(_) ->
     {ok, 200, Bridge2} = request(
         put,
         uri(["bridges", BridgeID]),
-        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
     ),
     ?assertMatch(
         #{
             <<"type">> := ?BRIDGE_TYPE,
-            <<"name">> := ?BRIDGE_NAME,
+            <<"name">> := Name,
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
@@ -225,7 +240,7 @@ t_http_crud_apis(_) ->
         [
             #{
                 <<"type">> := ?BRIDGE_TYPE,
-                <<"name">> := ?BRIDGE_NAME,
+                <<"name">> := Name,
                 <<"enable">> := true,
                 <<"status">> := _,
                 <<"node_status">> := [_ | _],
@@ -242,7 +257,7 @@ t_http_crud_apis(_) ->
     ?assertMatch(
         #{
             <<"type">> := ?BRIDGE_TYPE,
-            <<"name">> := ?BRIDGE_NAME,
+            <<"name">> := Name,
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
@@ -275,7 +290,7 @@ t_http_crud_apis(_) ->
     {ok, 404, ErrMsg2} = request(
         put,
         uri(["bridges", BridgeID]),
-        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
     ),
     ?assertMatch(
         #{
@@ -286,29 +301,28 @@ t_http_crud_apis(_) ->
     ),
     ok.
 
-t_start_stop_bridges(_) ->
-    lists:foreach(
-        fun(Type) ->
-            do_start_stop_bridges(Type)
-        end,
-        [node, cluster]
-    ).
+t_start_stop_bridges_node(Config) ->
+    do_start_stop_bridges(node, Config).
+
+t_start_stop_bridges_cluster(Config) ->
+    do_start_stop_bridges(cluster, Config).
 
-do_start_stop_bridges(Type) ->
+do_start_stop_bridges(Type, Config) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
-    Port = start_http_server(fun handle_fun_200_ok/2),
+    Port = ?config(port, Config),
     URL1 = ?URL(Port, "abc"),
+    Name = atom_to_binary(Type),
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
     %ct:pal("the bridge ==== ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
@@ -316,11 +330,11 @@ do_start_stop_bridges(Type) ->
         <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% stop it
     {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
     {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
-    ?assertMatch(#{<<"status">> := <<"disconnected">>}, jsx:decode(Bridge2)),
+    ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
     %% start again
     {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
@@ -339,21 +353,22 @@ do_start_stop_bridges(Type) ->
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
 
-t_enable_disable_bridges(_) ->
+t_enable_disable_bridges(Config) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
-    Port = start_http_server(fun handle_fun_200_ok/2),
+    Name = ?BRIDGE_NAME,
+    Port = ?config(port, Config),
     URL1 = ?URL(Port, "abc"),
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
     %ct:pal("the bridge ==== ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
@@ -361,11 +376,11 @@ t_enable_disable_bridges(_) ->
         <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% disable it
     {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
     {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
-    ?assertMatch(#{<<"status">> := <<"disconnected">>}, jsx:decode(Bridge2)),
+    ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
     %% enable again
     {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
@@ -391,21 +406,22 @@ t_enable_disable_bridges(_) ->
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
 
-t_reset_bridges(_) ->
+t_reset_bridges(Config) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
-    Port = start_http_server(fun handle_fun_200_ok/2),
+    Name = ?BRIDGE_NAME,
+    Port = ?config(port, Config),
     URL1 = ?URL(Port, "abc"),
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
     %ct:pal("the bridge ==== ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
@@ -413,7 +429,7 @@ t_reset_bridges(_) ->
         <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []),
 
     %% delete the bridge

+ 1 - 1
apps/emqx_connector/src/emqx_connector.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_connector, [
     {description, "An OTP application"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {mod, {emqx_connector_app, []}},
     {applications, [

+ 15 - 17
apps/emqx_connector/src/emqx_connector_http.erl

@@ -26,9 +26,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -164,6 +165,8 @@ ref(Field) -> hoconsc:ref(?MODULE, Field).
 
 %% ===================================================================
 
+callback_mode() -> always_sync.
+
 on_start(
     InstId,
     #{
@@ -225,7 +228,7 @@ on_stop(InstId, #{pool_name := PoolName}) ->
     }),
     ehttpc_sup:stop_pool(PoolName).
 
-on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
+on_query(InstId, {send_message, Msg}, State) ->
     case maps:get(request, State, undefined) of
         undefined ->
             ?SLOG(error, #{msg => "request_not_found", connector => InstId});
@@ -241,18 +244,16 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
             on_query(
                 InstId,
                 {undefined, Method, {Path, Headers, Body}, Timeout, Retry},
-                AfterQuery,
                 State
             )
     end;
-on_query(InstId, {Method, Request}, AfterQuery, State) ->
-    on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State);
-on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
-    on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State);
+on_query(InstId, {Method, Request}, State) ->
+    on_query(InstId, {undefined, Method, Request, 5000, 2}, State);
+on_query(InstId, {Method, Request, Timeout}, State) ->
+    on_query(InstId, {undefined, Method, Request, Timeout, 2}, State);
 on_query(
     InstId,
     {KeyOrNum, Method, Request, Timeout, Retry},
-    AfterQuery,
     #{pool_name := PoolName, base_path := BasePath} = State
 ) ->
     ?TRACE(
@@ -275,32 +276,29 @@ on_query(
     of
         {error, Reason} ->
             ?SLOG(error, #{
-                msg => "http_connector_do_reqeust_failed",
+                msg => "http_connector_do_request_failed",
                 request => NRequest,
                 reason => Reason,
                 connector => InstId
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            emqx_resource:query_success(AfterQuery);
+            ok;
         {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            emqx_resource:query_success(AfterQuery);
+            ok;
         {ok, StatusCode, _} ->
             ?SLOG(error, #{
                 msg => "http connector do request, received error response",
                 request => NRequest,
                 connector => InstId,
                 status_code => StatusCode
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         {ok, StatusCode, _, _} ->
             ?SLOG(error, #{
                 msg => "http connector do request, received error response",
                 request => NRequest,
                 connector => InstId,
                 status_code => StatusCode
-            }),
-            emqx_resource:query_failed(AfterQuery)
+            })
     end,
     Result.
 

+ 7 - 5
apps/emqx_connector/src/emqx_connector_ldap.erl

@@ -25,9 +25,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -42,6 +43,8 @@ roots() ->
 fields(_) -> [].
 
 %% ===================================================================
+callback_mode() -> always_sync.
+
 on_start(
     InstId,
     #{
@@ -99,7 +102,7 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
+on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = State) ->
     Request = {Base, Filter, Attributes},
     ?TRACE(
         "QUERY",
@@ -119,10 +122,9 @@ on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := P
                 request => Request,
                 connector => InstId,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         _ ->
-            emqx_resource:query_success(AfterQuery)
+            ok
     end,
     Result.
 

+ 6 - 7
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -25,9 +25,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -139,6 +140,8 @@ mongo_fields() ->
 
 %% ===================================================================
 
+callback_mode() -> always_sync.
+
 on_start(
     InstId,
     Config = #{
@@ -189,7 +192,6 @@ on_stop(InstId, #{poolname := PoolName}) ->
 on_query(
     InstId,
     {Action, Collection, Filter, Projector},
-    AfterQuery,
     #{poolname := PoolName} = State
 ) ->
     Request = {Action, Collection, Filter, Projector},
@@ -212,14 +214,11 @@ on_query(
                 reason => Reason,
                 connector => InstId
             }),
-            emqx_resource:query_failed(AfterQuery),
             {error, Reason};
         {ok, Cursor} when is_pid(Cursor) ->
-            emqx_resource:query_success(AfterQuery),
-            mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000);
+            {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)};
         Result ->
-            emqx_resource:query_success(AfterQuery),
-            Result
+            {ok, Result}
     end.
 
 -dialyzer({nowarn_function, [on_get_status/2]}).

+ 11 - 8
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -24,6 +24,7 @@
 
 %% API and callbacks for supervisor
 -export([
+    callback_mode/0,
     start_link/0,
     init/1,
     create_bridge/1,
@@ -37,7 +38,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -133,11 +134,14 @@ drop_bridge(Name) ->
 %% ===================================================================
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
 %% if the bridge received msgs from the remote broker.
-on_message_received(Msg, HookPoint, InstId) ->
-    _ = emqx_resource:query(InstId, {message_received, Msg}),
+on_message_received(Msg, HookPoint, ResId) ->
+    emqx_resource:inc_matched(ResId),
+    emqx_resource:inc_success(ResId),
     emqx:run_hook(HookPoint, [Msg]).
 
 %% ===================================================================
+callback_mode() -> always_sync.
+
 on_start(InstId, Conf) ->
     InstanceId = binary_to_atom(InstId, utf8),
     ?SLOG(info, #{
@@ -181,12 +185,10 @@ on_stop(_InstId, #{name := InstanceId}) ->
             })
     end.
 
-on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) ->
-    emqx_resource:query_success(AfterQuery);
-on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
+on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
     ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
     emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
-    emqx_resource:query_success(AfterQuery).
+    ok.
 
 on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
     AutoReconn = maps:get(auto_reconnect, Conf, true),
@@ -207,11 +209,12 @@ make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 ->
 make_sub_confs(undefined, _) ->
     undefined;
 make_sub_confs(SubRemoteConf, InstId) ->
+    ResId = emqx_resource_manager:manager_id_to_resource_id(InstId),
     case maps:take(hookpoint, SubRemoteConf) of
         error ->
             SubRemoteConf;
         {HookPoint, SubConf} ->
-            MFA = {?MODULE, on_message_received, [HookPoint, InstId]},
+            MFA = {?MODULE, on_message_received, [HookPoint, ResId]},
             SubConf#{on_message_received => MFA}
     end.
 

+ 9 - 11
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -24,9 +24,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -73,6 +74,8 @@ server(desc) -> ?DESC("server");
 server(_) -> undefined.
 
 %% ===================================================================
+callback_mode() -> always_sync.
+
 -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
 on_start(
     InstId,
@@ -122,14 +125,13 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {TypeOrKey, SQLOrKey}, AfterQuery, State) ->
-    on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, AfterQuery, State);
-on_query(InstId, {TypeOrKey, SQLOrKey, Params}, AfterQuery, State) ->
-    on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, AfterQuery, State);
+on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
+    on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
+on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
+    on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
 on_query(
     InstId,
     {TypeOrKey, SQLOrKey, Params, Timeout},
-    AfterQuery,
     #{poolname := PoolName, prepare_statement := Prepares} = State
 ) ->
     LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
@@ -147,7 +149,6 @@ on_query(
             ),
             %% kill the poll worker to trigger reconnection
             _ = exit(Conn, restart),
-            emqx_resource:query_failed(AfterQuery),
             Result;
         {error, not_prepared} ->
             ?SLOG(
@@ -157,13 +158,12 @@ on_query(
             case prepare_sql(Prepares, PoolName) of
                 ok ->
                     %% not return result, next loop will try again
-                    on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, AfterQuery, State);
+                    on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
                 {error, Reason} ->
                     ?SLOG(
                         error,
                         LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}
                     ),
-                    emqx_resource:query_failed(AfterQuery),
                     {error, Reason}
             end;
         {error, Reason} ->
@@ -171,10 +171,8 @@ on_query(
                 error,
                 LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
             ),
-            emqx_resource:query_failed(AfterQuery),
             Result;
         _ ->
-            emqx_resource:query_success(AfterQuery),
             Result
     end.
 

+ 9 - 7
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -27,9 +27,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -66,6 +67,8 @@ server(desc) -> ?DESC("server");
 server(_) -> undefined.
 
 %% ===================================================================
+callback_mode() -> always_sync.
+
 on_start(
     InstId,
     #{
@@ -116,9 +119,9 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {Type, NameOrSQL}, AfterQuery, #{poolname := _PoolName} = State) ->
-    on_query(InstId, {Type, NameOrSQL, []}, AfterQuery, State);
-on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
+on_query(InstId, {Type, NameOrSQL}, #{poolname := _PoolName} = State) ->
+    on_query(InstId, {Type, NameOrSQL, []}, State);
+on_query(InstId, {Type, NameOrSQL, Params}, #{poolname := PoolName} = State) ->
     ?SLOG(debug, #{
         msg => "postgresql connector received sql query",
         connector => InstId,
@@ -132,10 +135,9 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName}
                 connector => InstId,
                 sql => NameOrSQL,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         _ ->
-            emqx_resource:query_success(AfterQuery)
+            ok
     end,
     Result.
 

+ 7 - 5
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -26,9 +26,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -112,6 +113,8 @@ servers(desc) -> ?DESC("servers");
 servers(_) -> undefined.
 
 %% ===================================================================
+callback_mode() -> always_sync.
+
 on_start(
     InstId,
     #{
@@ -177,7 +180,7 @@ on_stop(InstId, #{poolname := PoolName, type := Type}) ->
         _ -> emqx_plugin_libs_pool:stop_pool(PoolName)
     end.
 
-on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
+on_query(InstId, {cmd, Command}, #{poolname := PoolName, type := Type} = State) ->
     ?TRACE(
         "QUERY",
         "redis_connector_received",
@@ -195,10 +198,9 @@ on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := T
                 connector => InstId,
                 sql => Command,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterCommand);
+            });
         _ ->
-            emqx_resource:query_success(AfterCommand)
+            ok
     end,
     Result.
 

+ 5 - 5
apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl

@@ -85,8 +85,8 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         emqx_resource:get_instance(PoolName),
     ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
     % % Perform query as further check that the resource is working as expected
-    ?assertMatch([], emqx_resource:query(PoolName, test_query_find())),
-    ?assertMatch(undefined, emqx_resource:query(PoolName, test_query_find_one())),
+    ?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())),
+    ?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())),
     ?assertEqual(ok, emqx_resource:stop(PoolName)),
     % Resource will be listed still, but state will be changed and healthcheck will fail
     % as the worker no longer exists.
@@ -95,7 +95,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         status := StoppedStatus
     }} =
         emqx_resource:get_instance(PoolName),
-    ?assertEqual(StoppedStatus, disconnected),
+    ?assertEqual(stopped, StoppedStatus),
     ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
@@ -108,8 +108,8 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
     {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
         emqx_resource:get_instance(PoolName),
     ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
-    ?assertMatch([], emqx_resource:query(PoolName, test_query_find())),
-    ?assertMatch(undefined, emqx_resource:query(PoolName, test_query_find_one())),
+    ?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())),
+    ?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())),
     % Stop and remove the resource in one go.
     ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),

+ 1 - 1
apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl

@@ -101,7 +101,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         status := StoppedStatus
     }} =
         emqx_resource:get_instance(PoolName),
-    ?assertEqual(StoppedStatus, disconnected),
+    ?assertEqual(stopped, StoppedStatus),
     ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),

+ 1 - 1
apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl

@@ -95,7 +95,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         status := StoppedStatus
     }} =
         emqx_resource:get_instance(PoolName),
-    ?assertEqual(StoppedStatus, disconnected),
+    ?assertEqual(stopped, StoppedStatus),
     ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),

+ 1 - 1
apps/emqx_connector/test/emqx_connector_redis_SUITE.erl

@@ -117,7 +117,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
         status := StoppedStatus
     }} =
         emqx_resource:get_instance(PoolName),
-    ?assertEqual(StoppedStatus, disconnected),
+    ?assertEqual(stopped, StoppedStatus),
     ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),

+ 1 - 1
apps/emqx_resource/README.md

@@ -14,5 +14,5 @@ the config operations (like config validation, config dump back to files), and t
 And we put all the `specific` codes to the callback modules.
 
 See
-* `test/emqx_test_resource.erl` for a minimal `emqx_resource` implementation;
+* `test/emqx_connector_demo.erl` for a minimal `emqx_resource` implementation;
 * `test/emqx_resource_SUITE.erl` for examples of `emqx_resource` usage.

+ 15 - 8
apps/emqx_resource/include/emqx_resource.hrl

@@ -21,10 +21,18 @@
 -type resource_config() :: term().
 -type resource_spec() :: map().
 -type resource_state() :: term().
--type resource_status() :: connected | disconnected | connecting.
+-type resource_status() :: connected | disconnected | connecting | stopped.
+-type callback_mode() :: always_sync | async_if_possible.
+-type result() :: term().
+-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
+-type query_opts() :: #{
+    %% The key used for picking a resource worker
+    pick_key => term()
+}.
 -type resource_data() :: #{
     id := resource_id(),
     mod := module(),
+    callback_mode := callback_mode(),
     config := resource_config(),
     state := resource_state(),
     status := resource_status(),
@@ -45,12 +53,11 @@
     %% periodically.
     auto_retry_interval => integer()
 }.
--type after_query() ::
-    {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]}
-    | undefined.
-
-%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
-%% actions upon query failure
--type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
+-type query_result() ::
+    ok
+    | {ok, term()}
+    | {error, term()}
+    | {resource_down, term()}.
 
 -define(TEST_ID_PREFIX, "_test_:").
+-define(RES_METRICS, resource_metrics).

+ 2 - 3
apps/emqx_resource/include/emqx_resource_utils.hrl

@@ -15,7 +15,7 @@
 %%--------------------------------------------------------------------
 
 -define(SAFE_CALL(_EXP_),
-    ?SAFE_CALL(_EXP_, ok)
+    ?SAFE_CALL(_EXP_, {error, {_EXCLASS_, _EXCPTION_, _ST_}})
 ).
 
 -define(SAFE_CALL(_EXP_, _EXP_ON_FAIL_),
@@ -24,8 +24,7 @@
             (_EXP_)
         catch
             _EXCLASS_:_EXCPTION_:_ST_ ->
-                _EXP_ON_FAIL_,
-                {error, {_EXCLASS_, _EXCPTION_, _ST_}}
+                _EXP_ON_FAIL_
         end
     end()
 ).

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 42 - 59
apps/emqx_resource/src/emqx_resource.erl

@@ -23,13 +23,6 @@
 
 -export([list_types/0]).
 
-%% APIs for behaviour implementations
-
--export([
-    query_success/1,
-    query_failed/1
-]).
-
 %% APIs for instances
 
 -export([
@@ -83,14 +76,17 @@
     stop/1,
     %% query the instance
     query/2,
-    %% query the instance with after_query()
-    query/3
+    %% query the instance without batching and queuing messages.
+    simple_sync_query/2,
+    simple_async_query/3
 ]).
 
 %% Direct calls to the callback module
 
-%% start the instance
 -export([
+    %% get the callback mode of a specific module
+    get_callback_mode/1,
+    %% start the instance
     call_start/3,
     %% verify if the resource is working normally
     call_health_check/3,
@@ -111,8 +107,11 @@
     list_group_instances/1
 ]).
 
+-export([inc_metrics_funcs/1, inc_matched/1, inc_success/1, inc_failed/1]).
+
 -optional_callbacks([
-    on_query/4,
+    on_query/3,
+    on_batch_query/3,
     on_get_status/2
 ]).
 
@@ -124,7 +123,9 @@
 -callback on_stop(resource_id(), resource_state()) -> term().
 
 %% when calling emqx_resource:query/3
--callback on_query(resource_id(), Request :: term(), after_query(), resource_state()) -> term().
+-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
+
+-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
 
 %% when calling emqx_resource:health_check/2
 -callback on_get_status(resource_id(), resource_state()) ->
@@ -148,22 +149,6 @@ is_resource_mod(Module) ->
             proplists:get_value(behaviour, Info, []),
     lists:member(?MODULE, Behaviour).
 
--spec query_success(after_query()) -> ok.
-query_success(undefined) -> ok;
-query_success({OnSucc, _}) -> apply_query_after_calls(OnSucc).
-
--spec query_failed(after_query()) -> ok.
-query_failed(undefined) -> ok;
-query_failed({_, OnFailed}) -> apply_query_after_calls(OnFailed).
-
-apply_query_after_calls(Funcs) ->
-    lists:foreach(
-        fun({Fun, Args}) ->
-            safe_apply(Fun, Args)
-        end,
-        Funcs
-    ).
-
 %% =================================================================================
 %% APIs for resource instances
 %% =================================================================================
@@ -243,29 +228,20 @@ reset_metrics(ResId) ->
 %% =================================================================================
 -spec query(resource_id(), Request :: term()) -> Result :: term().
 query(ResId, Request) ->
-    query(ResId, Request, inc_metrics_funcs(ResId)).
-
-%% same to above, also defines what to do when the Module:on_query success or failed
-%% it is the duty of the Module to apply the `after_query()` functions.
--spec query(resource_id(), Request :: term(), after_query()) -> Result :: term().
-query(ResId, Request, AfterQuery) ->
-    case emqx_resource_manager:ets_lookup(ResId) of
-        {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
-            %% the resource state is readonly to Module:on_query/4
-            %% and the `after_query()` functions should be thread safe
-            ok = emqx_metrics_worker:inc(resource_metrics, ResId, matched),
-            try
-                Mod:on_query(ResId, Request, AfterQuery, ResourceState)
-            catch
-                Err:Reason:ST ->
-                    emqx_metrics_worker:inc(resource_metrics, ResId, exception),
-                    erlang:raise(Err, Reason, ST)
-            end;
-        {ok, _Group, _Data} ->
-            query_error(not_connected, <<"resource not connected">>);
-        {error, not_found} ->
-            query_error(not_found, <<"resource not found">>)
-    end.
+    query(ResId, Request, #{}).
+
+-spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) ->
+    Result :: term().
+query(ResId, Request, Opts) ->
+    emqx_resource_worker:query(ResId, Request, Opts).
+
+-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
+simple_sync_query(ResId, Request) ->
+    emqx_resource_worker:simple_sync_query(ResId, Request).
+
+-spec simple_async_query(resource_id(), Request :: term(), reply_fun()) -> Result :: term().
+simple_async_query(ResId, Request, ReplyFun) ->
+    emqx_resource_worker:simple_async_query(ResId, Request, ReplyFun).
 
 -spec start(resource_id()) -> ok | {error, Reason :: term()}.
 start(ResId) ->
@@ -322,6 +298,10 @@ generate_id(Name) when is_binary(Name) ->
 -spec list_group_instances(resource_group()) -> [resource_id()].
 list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
 
+-spec get_callback_mode(module()) -> callback_mode().
+get_callback_mode(Mod) ->
+    Mod:callback_mode().
+
 -spec call_start(manager_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
 call_start(MgrId, Mod, Config) ->
@@ -429,16 +409,19 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
 
 %% =================================================================================
 
+inc_matched(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, matched).
+
+inc_success(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, success).
+
+inc_failed(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, failed).
+
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 
 inc_metrics_funcs(ResId) ->
-    OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, failed]}],
-    OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, success]}],
+    OnSucc = [{fun ?MODULE:inc_success/1, ResId}],
+    OnFailed = [{fun ?MODULE:inc_failed/1, ResId}],
     {OnSucc, OnFailed}.
-
-safe_apply(Func, Args) ->
-    ?SAFE_CALL(erlang:apply(Func, Args)).
-
-query_error(Reason, Msg) ->
-    {error, {?MODULE, #{reason => Reason, msg => Msg}}}.

+ 46 - 25
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -38,8 +38,12 @@
     list_group/1,
     ets_lookup/1,
     get_metrics/1,
-    reset_metrics/1,
-    set_resource_status_connecting/1
+    reset_metrics/1
+]).
+
+-export([
+    set_resource_status_connecting/1,
+    manager_id_to_resource_id/1
 ]).
 
 % Server
@@ -49,11 +53,12 @@
 -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
 
 % State record
--record(data, {id, manager_id, group, mod, config, opts, status, state, error}).
+-record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}).
+-type data() :: #data{}.
 
 -define(SHORT_HEALTHCHECK_INTERVAL, 1000).
 -define(HEALTHCHECK_INTERVAL, 15000).
--define(ETS_TABLE, emqx_resource_manager).
+-define(ETS_TABLE, ?MODULE).
 -define(WAIT_FOR_RESOURCE_DELAY, 100).
 -define(T_OPERATION, 5000).
 -define(T_LOOKUP, 1000).
@@ -64,6 +69,13 @@
 %% API
 %%------------------------------------------------------------------------------
 
+make_manager_id(ResId) ->
+    emqx_resource:generate_id(ResId).
+
+manager_id_to_resource_id(MgrId) ->
+    [ResId, _Index] = string:split(MgrId, ":", trailing),
+    ResId.
+
 %% @doc Called from emqx_resource when starting a resource instance.
 %%
 %% Triggers the emqx_resource_manager_sup supervisor to actually create
@@ -109,14 +121,17 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
     % The state machine will make the actual call to the callback/resource module after init
     ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts),
     ok = emqx_metrics_worker:create_metrics(
-        resource_metrics,
+        ?RES_METRICS,
         ResId,
-        [matched, success, failed, exception],
+        [matched, success, failed, exception, resource_down],
         [matched]
     ),
+    ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
     case maps:get(start_after_created, Opts, true) of
-        true -> wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
-        false -> ok
+        true ->
+            wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
+        false ->
+            ok
     end,
     ok.
 
@@ -207,12 +222,12 @@ ets_lookup(ResId) ->
 
 %% @doc Get the metrics for the specified resource
 get_metrics(ResId) ->
-    emqx_metrics_worker:get_metrics(resource_metrics, ResId).
+    emqx_metrics_worker:get_metrics(?RES_METRICS, ResId).
 
 %% @doc Reset the metrics for the specified resource
 -spec reset_metrics(resource_id()) -> ok.
 reset_metrics(ResId) ->
-    emqx_metrics_worker:reset_metrics(resource_metrics, ResId).
+    emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId).
 
 %% @doc Returns the data for all resources
 -spec list_all() -> [resource_data()] | [].
@@ -245,6 +260,7 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
         manager_id = MgrId,
         group = Group,
         mod = ResourceType,
+        callback_mode = emqx_resource:get_callback_mode(ResourceType),
         config = Config,
         opts = Opts,
         status = connecting,
@@ -298,8 +314,7 @@ handle_event({call, From}, stop, stopped, _Data) ->
     {keep_state_and_data, [{reply, From, ok}]};
 handle_event({call, From}, stop, _State, Data) ->
     Result = stop_resource(Data),
-    UpdatedData = Data#data{status = disconnected},
-    {next_state, stopped, UpdatedData, [{reply, From, Result}]};
+    {next_state, stopped, Data, [{reply, From, Result}]};
 % Called when a resource is to be stopped and removed.
 handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
     handle_remove_event(From, ClearMetrics, Data);
@@ -315,9 +330,10 @@ handle_event({call, From}, health_check, _State, Data) ->
     handle_manually_health_check(From, Data);
 % State: CONNECTING
 handle_event(enter, _OldState, connecting, Data) ->
+    UpdatedData = Data#data{status = connecting},
     insert_cache(Data#data.id, Data#data.group, Data),
     Actions = [{state_timeout, 0, health_check}],
-    {keep_state_and_data, Actions};
+    {keep_state, UpdatedData, Actions};
 handle_event(internal, start_resource, connecting, Data) ->
     start_resource(Data, undefined);
 handle_event(state_timeout, health_check, connecting, Data) ->
@@ -326,22 +342,24 @@ handle_event(state_timeout, health_check, connecting, Data) ->
 %% The connected state is entered after a successful on_start/2 of the callback mod
 %% and successful health_checks
 handle_event(enter, _OldState, connected, Data) ->
-    insert_cache(Data#data.id, Data#data.group, Data),
+    UpdatedData = Data#data{status = connected},
+    insert_cache(Data#data.id, Data#data.group, UpdatedData),
     _ = emqx_alarm:deactivate(Data#data.id),
-    Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
-    {next_state, connected, Data, Actions};
+    Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
+    {next_state, connected, UpdatedData, Actions};
 handle_event(state_timeout, health_check, connected, Data) ->
     handle_connected_health_check(Data);
 %% State: DISCONNECTED
 handle_event(enter, _OldState, disconnected, Data) ->
-    insert_cache(Data#data.id, Data#data.group, Data),
-    handle_disconnected_state_enter(Data);
+    UpdatedData = Data#data{status = disconnected},
+    insert_cache(Data#data.id, Data#data.group, UpdatedData),
+    handle_disconnected_state_enter(UpdatedData);
 handle_event(state_timeout, auto_retry, disconnected, Data) ->
     start_resource(Data, undefined);
 %% State: STOPPED
 %% The stopped state is entered after the resource has been explicitly stopped
 handle_event(enter, _OldState, stopped, Data) ->
-    UpdatedData = Data#data{status = disconnected},
+    UpdatedData = Data#data{status = stopped},
     insert_cache(Data#data.id, Data#data.group, UpdatedData),
     {next_state, stopped, UpdatedData};
 % Ignore all other events
@@ -415,9 +433,10 @@ handle_disconnected_state_enter(Data) ->
 handle_remove_event(From, ClearMetrics, Data) ->
     stop_resource(Data),
     case ClearMetrics of
-        true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id);
+        true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
         false -> ok
     end,
+    ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts),
     {stop_and_reply, normal, [{reply, From, ok}]}.
 
 start_resource(Data, From) ->
@@ -433,7 +452,7 @@ start_resource(Data, From) ->
             _ = maybe_alarm(disconnected, Data#data.id),
             %% Keep track of the error reason why the connection did not work
             %% so that the Reason can be returned when the verification call is made.
-            UpdatedData = Data#data{status = disconnected, error = Reason},
+            UpdatedData = Data#data{error = Reason},
             Actions = maybe_reply([], From, Err),
             {next_state, disconnected, UpdatedData, Actions}
     end.
@@ -449,9 +468,6 @@ stop_resource(Data) ->
     _ = maybe_clear_alarm(Data#data.id),
     ok.
 
-make_manager_id(ResId) ->
-    emqx_resource:generate_id(ResId).
-
 make_test_id() ->
     RandId = iolist_to_binary(emqx_misc:gen_id(16)),
     <<?TEST_ID_PREFIX, RandId/binary>>.
@@ -481,7 +497,7 @@ handle_connected_health_check(Data) ->
         Data,
         fun
             (connected, UpdatedData) ->
-                Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
+                Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
                 {keep_state, UpdatedData, Actions};
             (Status, UpdatedData) ->
                 ?SLOG(error, #{
@@ -504,6 +520,9 @@ with_health_check(Data, Func) ->
     insert_cache(ResId, UpdatedData#data.group, UpdatedData),
     Func(Status, UpdatedData).
 
+health_check_interval(Opts) ->
+    maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
+
 maybe_alarm(connected, _ResId) ->
     ok;
 maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>) ->
@@ -542,10 +561,12 @@ maybe_reply(Actions, undefined, _Reply) ->
 maybe_reply(Actions, From, Reply) ->
     [{reply, From, Reply} | Actions].
 
+-spec data_record_to_external_map_with_metrics(data()) -> resource_data().
 data_record_to_external_map_with_metrics(Data) ->
     #{
         id => Data#data.id,
         mod => Data#data.mod,
+        callback_mode => Data#data.callback_mode,
         config => Data#data.config,
         status => Data#data.status,
         state => Data#data.state,

+ 11 - 6
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -15,22 +15,20 @@
 %%--------------------------------------------------------------------
 -module(emqx_resource_sup).
 
+-include("emqx_resource.hrl").
+
 -behaviour(supervisor).
 
 -export([start_link/0]).
 
 -export([init/1]).
 
-%% set a very large pool size in case all the workers busy
--define(POOL_SIZE, 64).
-
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
-    Metrics = emqx_metrics_worker:child_spec(resource_metrics),
-
+    Metrics = emqx_metrics_worker:child_spec(?RES_METRICS),
     ResourceManager =
         #{
             id => emqx_resource_manager_sup,
@@ -40,4 +38,11 @@ init([]) ->
             type => supervisor,
             modules => [emqx_resource_manager_sup]
         },
-    {ok, {SupFlags, [Metrics, ResourceManager]}}.
+    WorkerSup = #{
+        id => emqx_resource_worker_sup,
+        start => {emqx_resource_worker_sup, start_link, []},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor
+    },
+    {ok, {SupFlags, [Metrics, ResourceManager, WorkerSup]}}.

+ 17 - 0
apps/emqx_resource/src/emqx_resource_utils.erl

@@ -0,0 +1,17 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_utils).

+ 449 - 0
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -0,0 +1,449 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% This module implements async message sending, disk message queuing,
+%%  and message batching using ReplayQ.
+
+-module(emqx_resource_worker).
+
+-include("emqx_resource.hrl").
+-include("emqx_resource_utils.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-behaviour(gen_statem).
+
+-export([
+    start_link/3,
+    query/3,
+    block/1,
+    block/2,
+    resume/1
+]).
+
+-export([
+    simple_sync_query/2,
+    simple_async_query/3
+]).
+
+-export([
+    callback_mode/0,
+    init/1,
+    terminate/2,
+    code_change/3
+]).
+
+-export([running/3, blocked/3]).
+
+-export([queue_item_marshaller/1, estimate_size/1]).
+
+-export([reply_after_query/4, batch_reply_after_query/4]).
+
+-define(RESUME_INTERVAL, 15000).
+
+%% count
+-define(DEFAULT_BATCH_SIZE, 100).
+%% milliseconds
+-define(DEFAULT_BATCH_TIME, 10).
+
+-define(Q_ITEM(REQUEST), {q_item, REQUEST}).
+
+-define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}).
+-define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}).
+-define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]).
+
+-define(RESOURCE_ERROR(Reason, Msg),
+    {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
+).
+-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
+
+-type id() :: binary().
+-type query() :: {query, from(), request()}.
+-type request() :: term().
+-type from() :: pid() | reply_fun().
+
+-export_type([query_opts/0]).
+
+-callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) ->
+    {{from(), result()}, NewCbState :: term()}.
+
+callback_mode() -> [state_functions].
+
+start_link(Id, Index, Opts) ->
+    gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
+
+-spec query(id(), request(), query_opts()) -> Result :: term().
+query(Id, Request, Opts) ->
+    PickKey = maps:get(pick_key, Opts, self()),
+    Timeout = maps:get(timeout, Opts, infinity),
+    pick_call(Id, PickKey, {query, Request}, Timeout).
+
+%% simple query the resource without batching and queuing messages.
+-spec simple_sync_query(id(), request()) -> Result :: term().
+simple_sync_query(Id, Request) ->
+    Result = call_query(sync, Id, ?QUERY(self(), Request), 1),
+    _ = handle_query_result(Id, Result, false),
+    Result.
+
+-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
+simple_async_query(Id, Request, ReplyFun) ->
+    Result = call_query(async, Id, ?QUERY(ReplyFun, Request), 1),
+    _ = handle_query_result(Id, Result, false),
+    Result.
+
+-spec block(pid() | atom()) -> ok.
+block(ServerRef) ->
+    gen_statem:cast(ServerRef, block).
+
+-spec block(pid() | atom(), [query()]) -> ok.
+block(ServerRef, Query) ->
+    gen_statem:cast(ServerRef, {block, Query}).
+
+-spec resume(pid() | atom()) -> ok.
+resume(ServerRef) ->
+    gen_statem:cast(ServerRef, resume).
+
+init({Id, Index, Opts}) ->
+    process_flag(trap_exit, true),
+    true = gproc_pool:connect_worker(Id, {Id, Index}),
+    BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
+    Queue =
+        case maps:get(queue_enabled, Opts, false) of
+            true ->
+                replayq:open(#{
+                    dir => disk_queue_dir(Id, Index),
+                    seg_bytes => 10000000,
+                    sizer => fun ?MODULE:estimate_size/1,
+                    marshaller => fun ?MODULE:queue_item_marshaller/1
+                });
+            false ->
+                undefined
+        end,
+    St = #{
+        id => Id,
+        index => Index,
+        %% query_mode = dynamic | sync | async
+        %% TODO:
+        %%  dynamic mode is async mode when things are going well, but becomes sync mode
+        %%  if the resource worker is overloaded
+        query_mode => maps:get(query_mode, Opts, sync),
+        async_reply_fun => maps:get(async_reply_fun, Opts, undefined),
+        batch_enabled => maps:get(batch_enabled, Opts, false),
+        batch_size => BatchSize,
+        batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
+        queue => Queue,
+        acc => [],
+        acc_left => BatchSize,
+        tref => undefined
+    },
+    {ok, blocked, St, {next_event, cast, resume}}.
+
+running(cast, resume, _St) ->
+    keep_state_and_data;
+running(cast, block, St) ->
+    {next_state, block, St};
+running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
+    Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
+    {next_state, block, St#{queue := Q1}};
+running({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) ->
+    From = maybe_quick_return(QM, From0, ReplyFun),
+    query_or_acc(From, Request, St);
+running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
+    flush(St#{tref := undefined});
+running(info, {flush, _Ref}, _St) ->
+    keep_state_and_data;
+running(info, Info, _St) ->
+    ?SLOG(error, #{msg => unexpected_msg, info => Info}),
+    keep_state_and_data.
+
+blocked(cast, block, _St) ->
+    keep_state_and_data;
+blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
+    Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
+    {keep_state, St#{queue := Q1}};
+blocked(cast, resume, St) ->
+    do_resume(St);
+blocked(state_timeout, resume, St) ->
+    do_resume(St);
+blocked({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) ->
+    From = maybe_quick_return(QM, From0, ReplyFun),
+    handle_blocked(From, Request, St).
+
+terminate(_Reason, #{id := Id, index := Index}) ->
+    gproc_pool:disconnect_worker(Id, {Id, Index}).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+queue_item_marshaller(?Q_ITEM(_) = I) ->
+    term_to_binary(I);
+queue_item_marshaller(Bin) when is_binary(Bin) ->
+    binary_to_term(Bin).
+
+estimate_size(QItem) ->
+    size(queue_item_marshaller(QItem)).
+
+%%==============================================================================
+maybe_quick_return(sync, From, _ReplyFun) ->
+    From;
+maybe_quick_return(async, From, ReplyFun) ->
+    gen_statem:reply(From, ok),
+    ReplyFun.
+
+pick_call(Id, Key, Query, Timeout) ->
+    try gproc_pool:pick_worker(Id, Key) of
+        Pid when is_pid(Pid) ->
+            gen_statem:call(Pid, Query, {clean_timeout, Timeout});
+        _ ->
+            ?RESOURCE_ERROR(not_created, "resource not found")
+    catch
+        error:badarg ->
+            ?RESOURCE_ERROR(not_created, "resource not found");
+        exit:{timeout, _} ->
+            ?RESOURCE_ERROR(timeout, "call resource timeout")
+    end.
+
+do_resume(#{queue := undefined} = St) ->
+    {next_state, running, St};
+do_resume(#{queue := Q, id := Id} = St) ->
+    case replayq:peek(Q) of
+        empty ->
+            {next_state, running, St};
+        ?Q_ITEM(FirstQuery) ->
+            Result = call_query(sync, Id, FirstQuery, 1),
+            case handle_query_result(Id, Result, false) of
+                %% Send failed because resource down
+                true ->
+                    {keep_state, St, {state_timeout, ?RESUME_INTERVAL, resume}};
+                %% Send ok or failed but the resource is working
+                false ->
+                    %% We Send 'resume' to the end of the mailbox to give the worker
+                    %% a chance to process 'query' requests.
+                    {keep_state, St#{queue => drop_head(Q)}, {state_timeout, 0, resume}}
+            end
+    end.
+
+handle_blocked(From, Request, #{id := Id, queue := Q} = St) ->
+    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
+    _ = reply_caller(Id, ?REPLY(From, Request, Error)),
+    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}.
+
+drop_head(Q) ->
+    {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
+    ok = replayq:ack(Q1, AckRef),
+    Q1.
+
+query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Left} = St0) ->
+    Acc1 = [?QUERY(From, Request) | Acc],
+    St = St0#{acc := Acc1, acc_left := Left - 1},
+    case Left =< 1 of
+        true -> flush(St);
+        false -> {keep_state, ensure_flush_timer(St)}
+    end;
+query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) ->
+    case send_query(QM, From, Request, Id) of
+        true ->
+            Query = ?QUERY(From, Request),
+            {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}};
+        false ->
+            {keep_state, St}
+    end.
+
+send_query(QM, From, Request, Id) ->
+    Result = call_query(QM, Id, ?QUERY(From, Request), 1),
+    reply_caller(Id, ?REPLY(From, Request, Result)).
+
+flush(#{acc := []} = St) ->
+    {keep_state, St};
+flush(
+    #{
+        id := Id,
+        acc := Batch,
+        batch_size := Size,
+        queue := Q0,
+        query_mode := QM
+    } = St
+) ->
+    Result = call_query(QM, Id, Batch, length(Batch)),
+    St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
+    case batch_reply_caller(Id, Result, Batch) of
+        true ->
+            Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]),
+            {next_state, blocked, St1#{queue := Q1}};
+        false ->
+            {keep_state, St1}
+    end.
+
+maybe_append_queue(undefined, _Items) -> undefined;
+maybe_append_queue(Q, Items) -> replayq:append(Q, Items).
+
+batch_reply_caller(Id, BatchResult, Batch) ->
+    lists:foldl(
+        fun(Reply, BlockWorker) ->
+            reply_caller(Id, Reply, BlockWorker)
+        end,
+        false,
+        %% the `Mod:on_batch_query/3` returns a single result for a batch,
+        %% so we need to expand
+        ?EXPAND(BatchResult, Batch)
+    ).
+
+reply_caller(Id, Reply) ->
+    reply_caller(Id, Reply, false).
+
+reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) ->
+    handle_query_result(Id, Result, BlockWorker);
+reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) ->
+    _ =
+        case Result of
+            {async_return, _} -> ok;
+            _ -> apply(ReplyFun, Args ++ [Result])
+        end,
+    handle_query_result(Id, Result, BlockWorker);
+reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
+    gen_statem:reply(From, Result),
+    handle_query_result(Id, Result, BlockWorker).
+
+handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
+    BlockWorker;
+handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
+    NotWorking == not_connected; NotWorking == blocked
+->
+    true;
+handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) ->
+    BlockWorker;
+handle_query_result(Id, {error, _}, BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
+    BlockWorker;
+handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
+    true;
+handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) ->
+    true;
+handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
+    BlockWorker;
+handle_query_result(Id, Result, BlockWorker) ->
+    %% assert
+    true = is_ok_result(Result),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
+    BlockWorker.
+
+call_query(QM, Id, Query, QueryLen) ->
+    case emqx_resource_manager:ets_lookup(Id) of
+        {ok, _Group, #{callback_mode := CM, mod := Mod, state := ResSt, status := connected}} ->
+            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, QueryLen),
+            apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt);
+        {ok, _Group, #{status := stopped}} ->
+            ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
+        {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
+            ?RESOURCE_ERROR(not_connected, "resource not connected");
+        {error, not_found} ->
+            ?RESOURCE_ERROR(not_found, "resource not found")
+    end.
+
+-define(APPLY_RESOURCE(EXPR, REQ),
+    try
+        %% if the callback module (connector) wants to return an error that
+        %% makes the current resource goes into the `error` state, it should
+        %% return `{resource_down, Reason}`
+        EXPR
+    catch
+        ERR:REASON:STACKTRACE ->
+            MSG = io_lib:format(
+                "call query failed, func: ~s, id: ~s, error: ~0p, Request: ~0p",
+                [??EXPR, Id, {ERR, REASON, STACKTRACE}, REQ],
+                [{chars_limit, 1024}]
+            ),
+            ?RESOURCE_ERROR(exception, MSG)
+    end
+).
+
+apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt) ->
+    ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
+    ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
+apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt) ->
+    ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
+    ReplyFun = fun ?MODULE:reply_after_query/4,
+    ?APPLY_RESOURCE(
+        begin
+            Result = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt),
+            {async_return, Result}
+        end,
+        Request
+    );
+apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
+    ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
+    Requests = [Request || ?QUERY(_From, Request) <- Batch],
+    ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
+apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
+    ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
+    Requests = [Request || ?QUERY(_From, Request) <- Batch],
+    ReplyFun = fun ?MODULE:batch_reply_after_query/4,
+    ?APPLY_RESOURCE(
+        begin
+            Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt),
+            {async_return, Result}
+        end,
+        Batch
+    ).
+
+reply_after_query(Pid, Id, ?QUERY(From, Request) = Query, Result) ->
+    case reply_caller(Id, ?REPLY(From, Request, Result)) of
+        true -> ?MODULE:block(Pid, [Query]);
+        false -> ok
+    end.
+
+batch_reply_after_query(Pid, Id, Batch, Result) ->
+    case batch_reply_caller(Id, Result, Batch) of
+        true -> ?MODULE:block(Pid, Batch);
+        false -> ok
+    end.
+
+%%==============================================================================
+call_mode(sync, _) -> sync;
+call_mode(async, always_sync) -> sync;
+call_mode(async, async_if_possible) -> async.
+
+is_ok_result(ok) ->
+    true;
+is_ok_result(R) when is_tuple(R) ->
+    erlang:element(1, R) == ok;
+is_ok_result(_) ->
+    false.
+
+-spec name(id(), integer()) -> atom().
+name(Id, Index) ->
+    Mod = atom_to_list(?MODULE),
+    Id1 = binary_to_list(Id),
+    Index1 = integer_to_list(Index),
+    list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])).
+
+disk_queue_dir(Id, Index) ->
+    filename:join([node(), emqx:data_dir(), Id, "queue:" ++ integer_to_list(Index)]).
+
+ensure_flush_timer(St = #{tref := undefined, batch_time := T}) ->
+    Ref = make_ref(),
+    TRef = erlang:send_after(T, self(), {flush, Ref}),
+    St#{tref => {TRef, Ref}};
+ensure_flush_timer(St) ->
+    St.
+
+cancel_flush_timer(St = #{tref := undefined}) ->
+    St;
+cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
+    _ = erlang:cancel_timer(TRef),
+    St#{tref => undefined}.

+ 136 - 0
apps/emqx_resource/src/emqx_resource_worker_sup.erl

@@ -0,0 +1,136 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_resource_worker_sup).
+-behaviour(supervisor).
+
+%%%=============================================================================
+%%% Exports and Definitions
+%%%=============================================================================
+
+%% External API
+-export([start_link/0]).
+
+-export([start_workers/2, stop_workers/2]).
+
+%% Callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%%=============================================================================
+%%% API
+%%%=============================================================================
+
+-spec start_link() -> supervisor:startlink_ret().
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%%=============================================================================
+%%% Callbacks
+%%%=============================================================================
+
+-spec init(list()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}} | ignore.
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 100,
+        period => 30
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.
+
+start_workers(ResId, Opts) ->
+    PoolSize = pool_size(Opts),
+    _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]),
+    lists:foreach(
+        fun(Idx) ->
+            _ = ensure_worker_added(ResId, Idx),
+            ok = ensure_worker_started(ResId, Idx, Opts)
+        end,
+        lists:seq(1, PoolSize)
+    ).
+
+stop_workers(ResId, Opts) ->
+    PoolSize = pool_size(Opts),
+    lists:foreach(
+        fun(Idx) ->
+            ensure_worker_removed(ResId, Idx)
+        end,
+        lists:seq(1, PoolSize)
+    ),
+    ensure_worker_pool_removed(ResId),
+    ok.
+
+%%%=============================================================================
+%%% Internal
+%%%=============================================================================
+pool_size(Opts) ->
+    maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)).
+
+ensure_worker_pool(ResId, Type, Opts) ->
+    try
+        gproc_pool:new(ResId, Type, Opts)
+    catch
+        error:exists -> ok
+    end,
+    ok.
+
+ensure_worker_added(ResId, Idx) ->
+    try
+        gproc_pool:add_worker(ResId, {ResId, Idx}, Idx)
+    catch
+        error:exists -> ok
+    end,
+    ok.
+
+-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}).
+ensure_worker_started(ResId, Idx, Opts) ->
+    Mod = emqx_resource_worker,
+    Spec = #{
+        id => ?CHILD_ID(Mod, ResId, Idx),
+        start => {Mod, start_link, [ResId, Idx, Opts]},
+        restart => transient,
+        shutdown => 5000,
+        type => worker,
+        modules => [Mod]
+    },
+    case supervisor:start_child(emqx_resource_sup, Spec) of
+        {ok, _Pid} -> ok;
+        {error, {already_started, _}} -> ok;
+        {error, already_present} -> ok;
+        {error, _} = Err -> Err
+    end.
+
+ensure_worker_removed(ResId, Idx) ->
+    ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx),
+    case supervisor:terminate_child(emqx_resource_sup, ChildId) of
+        ok ->
+            Res = supervisor:delete_child(emqx_resource_sup, ChildId),
+            _ = gproc_pool:remove_worker(ResId, {ResId, Idx}),
+            Res;
+        {error, not_found} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+ensure_worker_pool_removed(ResId) ->
+    try
+        gproc_pool:delete(ResId)
+    catch
+        error:badarg -> ok
+    end,
+    ok.

+ 162 - 0
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -0,0 +1,162 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connector_demo).
+
+-include_lib("typerefl/include/types.hrl").
+
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_query_async/4,
+    on_batch_query/3,
+    on_get_status/2
+]).
+
+-export([counter_loop/1, set_callback_mode/1]).
+
+%% callbacks for emqx_resource config schema
+-export([roots/0]).
+
+roots() ->
+    [
+        {name, fun name/1},
+        {register, fun register/1}
+    ].
+
+name(type) -> atom();
+name(required) -> true;
+name(_) -> undefined.
+
+register(type) -> boolean();
+register(required) -> true;
+register(default) -> false;
+register(_) -> undefined.
+
+-define(CM_KEY, {?MODULE, callback_mode}).
+callback_mode() ->
+    persistent_term:get(?CM_KEY).
+
+set_callback_mode(Mode) ->
+    persistent_term:put(?CM_KEY, Mode).
+
+on_start(_InstId, #{create_error := true}) ->
+    error("some error");
+on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
+    Register = maps:get(register, Opts, false),
+    {ok, Opts#{
+        id => InstId,
+        stop_error => true,
+        pid => spawn_counter_process(Name, Register)
+    }};
+on_start(InstId, #{name := Name} = Opts) ->
+    Register = maps:get(register, Opts, false),
+    {ok, Opts#{
+        id => InstId,
+        pid => spawn_counter_process(Name, Register)
+    }}.
+
+on_stop(_InstId, #{stop_error := true}) ->
+    {error, stop_error};
+on_stop(_InstId, #{pid := Pid}) ->
+    erlang:exit(Pid, shutdown),
+    ok.
+
+on_query(_InstId, get_state, State) ->
+    {ok, State};
+on_query(_InstId, get_state_failed, State) ->
+    {error, State};
+on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
+    Pid ! {inc, N},
+    ok;
+on_query(_InstId, get_counter, #{pid := Pid}) ->
+    ReqRef = make_ref(),
+    From = {self(), ReqRef},
+    Pid ! {From, get},
+    receive
+        {ReqRef, Num} -> {ok, Num}
+    after 1000 ->
+        {error, timeout}
+    end.
+
+on_query_async(_InstId, Query, ReplyFun, State) ->
+    Result = on_query(_InstId, Query, State),
+    apply_reply(ReplyFun, Result).
+
+on_batch_query(InstId, BatchReq, State) ->
+    %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed.
+    case hd(BatchReq) of
+        {inc_counter, _} ->
+            batch_inc_counter(InstId, BatchReq, State);
+        get_counter ->
+            batch_get_counter(InstId, State)
+    end.
+
+batch_inc_counter(InstId, BatchReq, State) ->
+    TotalN = lists:foldl(
+        fun
+            ({inc_counter, N}, Total) ->
+                Total + N;
+            (Req, _Total) ->
+                error({mixed_requests_not_allowed, {inc_counter, Req}})
+        end,
+        0,
+        BatchReq
+    ),
+    on_query(InstId, {inc_counter, TotalN}, State).
+
+batch_get_counter(InstId, State) ->
+    on_query(InstId, get_counter, State).
+
+on_get_status(_InstId, #{health_check_error := true}) ->
+    disconnected;
+on_get_status(_InstId, #{pid := Pid}) ->
+    timer:sleep(300),
+    case is_process_alive(Pid) of
+        true -> connected;
+        false -> disconnected
+    end.
+
+spawn_counter_process(Name, Register) ->
+    Pid = spawn_link(?MODULE, counter_loop, [#{counter => 0}]),
+    true = maybe_register(Name, Pid, Register),
+    Pid.
+
+counter_loop(#{counter := Num} = State) ->
+    NewState =
+        receive
+            {inc, N} ->
+                #{counter => Num + N};
+            {{FromPid, ReqRef}, get} ->
+                FromPid ! {ReqRef, Num},
+                State
+        end,
+    counter_loop(NewState).
+
+maybe_register(Name, Pid, true) ->
+    ct:pal("---- Register Name: ~p", [Name]),
+    ct:pal("---- whereis(): ~p", [whereis(Name)]),
+    erlang:register(Name, Pid);
+maybe_register(_Name, _Pid, false) ->
+    true.
+
+apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) ->
+    apply(ReplyFun, Args ++ [Result]).

+ 192 - 45
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -22,10 +22,13 @@
 -include_lib("common_test/include/ct.hrl").
 -include("emqx_resource.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(TEST_RESOURCE, emqx_test_resource).
+-define(TEST_RESOURCE, emqx_connector_demo).
 -define(ID, <<"id">>).
 -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
+-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
+-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -34,7 +37,10 @@ groups() ->
     [].
 
 init_per_testcase(_, Config) ->
+    emqx_connector_demo:set_callback_mode(always_sync),
     Config.
+end_per_testcase(_, _Config) ->
+    _ = emqx_resource:remove(?ID).
 
 init_per_suite(Config) ->
     code:ensure_loaded(?TEST_RESOURCE),
@@ -80,7 +86,7 @@ t_create_remove(_) ->
         #{name => test_resource},
         #{}
     ),
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid)),
 
@@ -110,7 +116,7 @@ t_create_remove_local(_) ->
         #{name => test_resource},
         #{}
     ),
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid)),
 
@@ -127,7 +133,7 @@ t_create_remove_local(_) ->
     {error, _} = emqx_resource:remove_local(?ID),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_found}}},
+        ?RESOURCE_ERROR(not_created),
         emqx_resource:query(?ID, get_state)
     ),
     ?assertNot(is_process_alive(Pid)).
@@ -143,23 +149,23 @@ t_do_not_start_after_created(_) ->
     %% the resource should remain `disconnected` after created
     timer:sleep(200),
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_connected}}},
+        ?RESOURCE_ERROR(stopped),
         emqx_resource:query(?ID, get_state)
     ),
     ?assertMatch(
-        {ok, _, #{status := disconnected}},
+        {ok, _, #{status := stopped}},
         emqx_resource:get_instance(?ID)
     ),
 
     %% start the resource manually..
     ok = emqx_resource:start(?ID),
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     ?assert(is_process_alive(Pid)),
 
     %% restart the resource
     ok = emqx_resource:restart(?ID),
     ?assertNot(is_process_alive(Pid)),
-    #{pid := Pid2} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
     ?assert(is_process_alive(Pid2)),
 
     ok = emqx_resource:remove_local(?ID),
@@ -174,50 +180,170 @@ t_query(_) ->
         #{name => test_resource}
     ),
 
-    Pid = self(),
-    Success = fun() -> Pid ! success end,
-    Failure = fun() -> Pid ! failure end,
-
-    #{pid := _} = emqx_resource:query(?ID, get_state),
-    #{pid := _} = emqx_resource:query(?ID, get_state, {[{Success, []}], [{Failure, []}]}),
-    #{pid := _} = emqx_resource:query(?ID, get_state, undefined),
-    #{pid := _} = emqx_resource:query(?ID, get_state_failed, undefined),
-
-    receive
-        Message -> ?assertEqual(success, Message)
-    after 100 ->
-        ?assert(false)
-    end,
+    {ok, #{pid := _}} = emqx_resource:query(?ID, get_state),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_found}}},
+        ?RESOURCE_ERROR(not_created),
         emqx_resource:query(<<"unknown">>, get_state)
     ),
 
     ok = emqx_resource:remove_local(?ID).
 
-t_healthy_timeout(_) ->
+t_query_counter(_) ->
     {ok, _} = emqx_resource:create_local(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
-        #{name => <<"test_resource">>},
-        #{health_check_timeout => 200}
+        #{name => test_resource, register => true}
     ),
-    timer:sleep(500),
+
+    {ok, 0} = emqx_resource:query(?ID, get_counter),
+    ok = emqx_resource:query(?ID, {inc_counter, 1}),
+    {ok, 1} = emqx_resource:query(?ID, get_counter),
+    ok = emqx_resource:query(?ID, {inc_counter, 5}),
+    {ok, 6} = emqx_resource:query(?ID, get_counter),
 
     ok = emqx_resource:remove_local(?ID).
 
-t_healthy(_) ->
+t_batch_query_counter(_) ->
     {ok, _} = emqx_resource:create_local(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
-        #{name => <<"test_resource">>}
+        #{name => test_resource, register => true},
+        #{batch_enabled => true}
     ),
-    timer:sleep(400),
 
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    ?check_trace(
+        ?TRACE_OPTS,
+        emqx_resource:query(?ID, get_counter),
+        fun(Result, Trace) ->
+            ?assertMatch({ok, 0}, Result),
+            QueryTrace = ?of_kind(call_batch_query, Trace),
+            ?assertMatch([#{batch := [{query, _, get_counter}]}], QueryTrace)
+        end
+    ),
+
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(1000),
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_batch_query, Trace),
+            ?assertMatch([#{batch := BatchReq} | _] when length(BatchReq) > 1, QueryTrace)
+        end
+    ),
+    {ok, 1000} = emqx_resource:query(?ID, get_counter),
+
+    ok = emqx_resource:remove_local(?ID).
+
+t_query_counter_async(_) ->
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource, register => true},
+        #{query_mode => async}
+    ),
+    ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(1000),
+        fun(Trace) ->
+            %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
+            QueryTrace = ?of_kind(call_query, Trace),
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
+        end
+    ),
+    %% wait for 1s to make sure all the aysnc query is sent to the resource.
+    timer:sleep(1000),
+    %% simple query ignores the query_mode and batching settings in the resource_worker
+    ?check_trace(
+        ?TRACE_OPTS,
+        emqx_resource:simple_sync_query(?ID, get_counter),
+        fun(Result, Trace) ->
+            ?assertMatch({ok, 1000}, Result),
+            %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
+            QueryTrace = ?of_kind(call_query, Trace),
+            ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace)
+        end
+    ),
+    {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
+    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ok = emqx_resource:remove_local(?ID).
+
+t_query_counter_async_2(_) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+
+    Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
+    Insert = fun(Tab, Result) ->
+        ets:insert(Tab, {make_ref(), Result})
+    end,
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource, register => true},
+        #{query_mode => async, async_reply_fun => {Insert, [Tab0]}}
+    ),
+    ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(1000),
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_query_async, Trace),
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
+        end
+    ),
+
+    %% wait for 1s to make sure all the aysnc query is sent to the resource.
+    timer:sleep(1000),
+    %% simple query ignores the query_mode and batching settings in the resource_worker
+    ?check_trace(
+        ?TRACE_OPTS,
+        emqx_resource:simple_sync_query(?ID, get_counter),
+        fun(Result, Trace) ->
+            ?assertMatch({ok, 1000}, Result),
+            QueryTrace = ?of_kind(call_query, Trace),
+            ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace)
+        end
+    ),
+    {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
+    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ?assertMatch(1000, ets:info(Tab0, size)),
+    ?assert(
+        lists:all(
+            fun
+                ({_, ok}) -> true;
+                (_) -> false
+            end,
+            ets:tab2list(Tab0)
+        )
+    ),
+    ok = emqx_resource:remove_local(?ID).
+
+t_healthy_timeout(_) ->
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => <<"bad_not_atom_name">>, register => true},
+        %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
+        #{health_check_interval => 200}
+    ),
+    ?assertMatch(
+        ?RESOURCE_ERROR(not_connected),
+        emqx_resource:query(?ID, get_state)
+    ),
+    ok = emqx_resource:remove_local(?ID).
+
+t_healthy(_) ->
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource}
+    ),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     timer:sleep(300),
     emqx_resource:set_resource_status_connecting(?ID),
 
@@ -229,10 +355,10 @@ t_healthy(_) ->
 
     erlang:exit(Pid, shutdown),
 
-    ?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)),
+    ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)),
 
     ?assertMatch(
-        [#{status := connecting}],
+        [#{status := disconnected}],
         emqx_resource:list_instances_verbose()
     ),
 
@@ -260,7 +386,7 @@ t_stop_start(_) ->
         #{}
     ),
 
-    #{pid := Pid0} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid0)),
 
@@ -269,14 +395,14 @@ t_stop_start(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_connected}}},
+        ?RESOURCE_ERROR(stopped),
         emqx_resource:query(?ID, get_state)
     ),
 
     ok = emqx_resource:restart(?ID),
     timer:sleep(300),
 
-    #{pid := Pid1} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid1)).
 
@@ -302,7 +428,7 @@ t_stop_start_local(_) ->
         #{}
     ),
 
-    #{pid := Pid0} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid0)),
 
@@ -311,13 +437,13 @@ t_stop_start_local(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_connected}}},
+        ?RESOURCE_ERROR(stopped),
         emqx_resource:query(?ID, get_state)
     ),
 
     ok = emqx_resource:restart(?ID),
 
-    #{pid := Pid1} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid1)).
 
@@ -358,6 +484,10 @@ t_create_dry_run_local(_) ->
     [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}).
 
 create_dry_run_local_succ() ->
+    case whereis(test_resource) of
+        undefined -> ok;
+        Pid -> exit(Pid, kill)
+    end,
     ?assertEqual(
         ok,
         emqx_resource:create_dry_run_local(
@@ -368,17 +498,17 @@ create_dry_run_local_succ() ->
     ?assertEqual(undefined, whereis(test_resource)).
 
 t_create_dry_run_local_failed(_) ->
-    {Res1, _} = emqx_resource:create_dry_run_local(
+    Res1 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
-        #{cteate_error => true}
+        #{create_error => true}
     ),
-    ?assertEqual(error, Res1),
+    ?assertMatch({error, _}, Res1),
 
-    {Res2, _} = emqx_resource:create_dry_run_local(
+    Res2 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{name => test_resource, health_check_error => true}
     ),
-    ?assertEqual(error, Res2),
+    ?assertMatch({error, _}, Res2),
 
     Res3 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
@@ -400,7 +530,7 @@ t_reset_metrics(_) ->
         #{name => test_resource}
     ),
 
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     emqx_resource:reset_metrics(?ID),
     ?assert(is_process_alive(Pid)),
     ok = emqx_resource:remove(?ID),
@@ -419,6 +549,23 @@ t_auto_retry(_) ->
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
+inc_counter_in_parallel(N) ->
+    Parent = self(),
+    Pids = [
+        erlang:spawn(fun() ->
+            emqx_resource:query(?ID, {inc_counter, 1}),
+            Parent ! {complete, self()}
+        end)
+     || _ <- lists:seq(1, N)
+    ],
+    [
+        receive
+            {complete, Pid} -> ok
+        after 1000 ->
+            ct:fail({wait_for_query_timeout, Pid})
+        end
+     || Pid <- Pids
+    ].
 
 bin_config() ->
     <<"\"name\": \"test_resource\"">>.

+ 0 - 110
apps/emqx_resource/test/emqx_test_resource.erl

@@ -1,110 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_test_resource).
-
--include_lib("typerefl/include/types.hrl").
-
--behaviour(emqx_resource).
-
-%% callbacks of behaviour emqx_resource
--export([
-    on_start/2,
-    on_stop/2,
-    on_query/4,
-    on_get_status/2
-]).
-
-%% callbacks for emqx_resource config schema
--export([roots/0]).
-
-roots() ->
-    [
-        {name, fun name/1},
-        {register, fun register/1}
-    ].
-
-name(type) -> atom();
-name(required) -> true;
-name(_) -> undefined.
-
-register(type) -> boolean();
-register(required) -> true;
-register(default) -> false;
-register(_) -> undefined.
-
-on_start(_InstId, #{create_error := true}) ->
-    error("some error");
-on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
-    Register = maps:get(register, Opts, false),
-    {ok, #{
-        name => Name,
-        id => InstId,
-        stop_error => true,
-        pid => spawn_dummy_process(Name, Register)
-    }};
-on_start(InstId, #{name := Name, health_check_error := true} = Opts) ->
-    Register = maps:get(register, Opts, false),
-    {ok, #{
-        name => Name,
-        id => InstId,
-        health_check_error => true,
-        pid => spawn_dummy_process(Name, Register)
-    }};
-on_start(InstId, #{name := Name} = Opts) ->
-    Register = maps:get(register, Opts, false),
-    {ok, #{
-        name => Name,
-        id => InstId,
-        pid => spawn_dummy_process(Name, Register)
-    }}.
-
-on_stop(_InstId, #{stop_error := true}) ->
-    {error, stop_error};
-on_stop(_InstId, #{pid := Pid}) ->
-    erlang:exit(Pid, shutdown),
-    ok.
-
-on_query(_InstId, get_state, AfterQuery, State) ->
-    emqx_resource:query_success(AfterQuery),
-    State;
-on_query(_InstId, get_state_failed, AfterQuery, State) ->
-    emqx_resource:query_failed(AfterQuery),
-    State.
-
-on_get_status(_InstId, #{health_check_error := true}) ->
-    disconnected;
-on_get_status(_InstId, #{pid := Pid}) ->
-    timer:sleep(300),
-    case is_process_alive(Pid) of
-        true -> connected;
-        false -> connecting
-    end.
-
-spawn_dummy_process(Name, Register) ->
-    spawn(
-        fun() ->
-            true =
-                case Register of
-                    true -> register(Name, self());
-                    _ -> true
-                end,
-            Ref = make_ref(),
-            receive
-                Ref -> ok
-            end
-        end
-    ).

+ 14 - 15
lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl

@@ -13,9 +13,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -33,6 +34,7 @@
 
 %% -------------------------------------------------------------------------------------------------
 %% resource callback
+callback_mode() -> always_sync.
 
 on_start(InstId, Config) ->
     start_client(InstId, Config).
@@ -52,11 +54,10 @@ on_stop(InstId, #{client := Client, producer := Producer}) ->
 on_query(
     _InstId,
     {send_message, Data},
-    AfterQuery,
     #{producer := Producer, ordering_key := OrderingKey, payload := Payload}
 ) ->
     Record = to_record(OrderingKey, Payload, Data),
-    do_append(AfterQuery, Producer, Record).
+    do_append(Producer, Record).
 
 on_get_status(_InstId, #{client := Client}) ->
     case is_alive(Client) of
@@ -260,27 +261,26 @@ to_record(OrderingKey, Payload) when is_binary(OrderingKey) ->
 to_record(OrderingKey, Payload) ->
     hstreamdb:to_record(OrderingKey, raw, Payload).
 
-do_append(AfterQuery, Producer, Record) ->
-    do_append(AfterQuery, false, Producer, Record).
+do_append(Producer, Record) ->
+    do_append(false, Producer, Record).
 
 %% TODO: this append is async, remove or change it after we have better disk cache.
-% do_append(AfterQuery, true, Producer, Record) ->
+% do_append(true, Producer, Record) ->
 %     case hstreamdb:append(Producer, Record) of
 %         ok ->
 %             ?SLOG(debug, #{
 %                 msg => "hstreamdb producer async append success",
 %                 record => Record
-%             }),
-%             emqx_resource:query_success(AfterQuery);
-%         {error, Reason} ->
+%             });
+%         {error, Reason} = Err ->
 %             ?SLOG(error, #{
 %                 msg => "hstreamdb producer async append failed",
 %                 reason => Reason,
 %                 record => Record
 %             }),
-%             emqx_resource:query_failed(AfterQuery)
+%             Err
 %     end;
-do_append(AfterQuery, false, Producer, Record) ->
+do_append(false, Producer, Record) ->
     %% TODO: this append is sync, but it does not support [Record], can only append one Record.
     %% Change it after we have better dick cache.
     case hstreamdb:append_flush(Producer, Record) of
@@ -288,15 +288,14 @@ do_append(AfterQuery, false, Producer, Record) ->
             ?SLOG(debug, #{
                 msg => "hstreamdb producer sync append success",
                 record => Record
-            }),
-            emqx_resource:query_success(AfterQuery);
-        {error, Reason} ->
+            });
+        {error, Reason} = Err ->
             ?SLOG(error, #{
                 msg => "hstreamdb producer sync append failed",
                 reason => Reason,
                 record => Record
             }),
-            emqx_resource:query_failed(AfterQuery)
+            Err
     end.
 
 client_name(InstId) ->

+ 9 - 8
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -13,9 +13,10 @@
 
 %% callbacks of behaviour emqx_resource
 -export([
+    callback_mode/0,
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -28,6 +29,7 @@
 
 %% -------------------------------------------------------------------------------------------------
 %% resource callback
+callback_mode() -> always_sync.
 
 on_start(InstId, Config) ->
     start_client(InstId, Config).
@@ -35,8 +37,8 @@ on_start(InstId, Config) ->
 on_stop(_InstId, #{client := Client}) ->
     influxdb:stop_client(Client).
 
-on_query(InstId, {send_message, Data}, AfterQuery, State) ->
-    do_query(InstId, {send_message, Data}, AfterQuery, State).
+on_query(InstId, {send_message, Data}, State) ->
+    do_query(InstId, {send_message, Data}, State).
 
 on_get_status(_InstId, #{client := Client}) ->
     case influxdb:is_alive(Client) of
@@ -308,7 +310,7 @@ ssl_config(SSL = #{enable := true}) ->
 %% -------------------------------------------------------------------------------------------------
 %% Query
 
-do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) ->
+do_query(InstId, {send_message, Data}, State = #{client := Client}) ->
     {Points, Errs} = data_to_points(Data, State),
     lists:foreach(
         fun({error, Reason}) ->
@@ -326,15 +328,14 @@ do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client})
                 msg => "influxdb write point success",
                 connector => InstId,
                 points => Points
-            }),
-            emqx_resource:query_success(AfterQuery);
-        {error, Reason} ->
+            });
+        {error, Reason} = Err ->
             ?SLOG(error, #{
                 msg => "influxdb write point failed",
                 connector => InstId,
                 reason => Reason
             }),
-            emqx_resource:query_failed(AfterQuery)
+            Err
     end.
 
 %% -------------------------------------------------------------------------------------------------