Просмотр исходного кода

refactor: create emqx_resource_worker_sup for resource workers

Shawn 3 лет назад
Родитель
Сommit
2fb42e4d37

+ 5 - 7
apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl

@@ -24,7 +24,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2,
     connect/1
 ]).
@@ -45,7 +45,7 @@ on_start(InstId, Opts) ->
 on_stop(_InstId, #{pool_name := PoolName}) ->
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) ->
+on_query(InstId, get_jwks, #{pool_name := PoolName}) ->
     Result = ecpool:pick_and_do(PoolName, {emqx_authn_jwks_client, get_jwks, []}, no_handover),
     case Result of
         {error, Reason} ->
@@ -54,20 +54,18 @@ on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) ->
                 connector => InstId,
                 command => get_jwks,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         _ ->
-            emqx_resource:query_success(AfterQuery)
+            ok
     end,
     Result;
-on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
+on_query(_InstId, {update, Opts}, #{pool_name := PoolName}) ->
     lists:foreach(
         fun({_, Worker}) ->
             ok = ecpool_worker:exec(Worker, {emqx_authn_jwks_client, update, [Opts]}, infinity)
         end,
         ecpool:workers(PoolName)
     ),
-    emqx_resource:query_success(AfterQuery),
     ok.
 
 on_get_status(_InstId, #{pool_name := PoolName}) ->

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -174,7 +174,7 @@ authenticate(
                 reason => Reason
             }),
             ignore;
-        Doc ->
+        {ok, Doc} ->
             case check_password(Password, Doc, State) of
                 ok ->
                     {ok, is_superuser(Doc, State)};

+ 12 - 17
apps/emqx_connector/src/emqx_connector_http.erl

@@ -28,7 +28,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -225,7 +225,7 @@ on_stop(InstId, #{pool_name := PoolName}) ->
     }),
     ehttpc_sup:stop_pool(PoolName).
 
-on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
+on_query(InstId, {send_message, Msg}, State) ->
     case maps:get(request, State, undefined) of
         undefined ->
             ?SLOG(error, #{msg => "request_not_found", connector => InstId});
@@ -241,18 +241,16 @@ on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
             on_query(
                 InstId,
                 {undefined, Method, {Path, Headers, Body}, Timeout, Retry},
-                AfterQuery,
                 State
             )
     end;
-on_query(InstId, {Method, Request}, AfterQuery, State) ->
-    on_query(InstId, {undefined, Method, Request, 5000, 2}, AfterQuery, State);
-on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
-    on_query(InstId, {undefined, Method, Request, Timeout, 2}, AfterQuery, State);
+on_query(InstId, {Method, Request}, State) ->
+    on_query(InstId, {undefined, Method, Request, 5000, 2}, State);
+on_query(InstId, {Method, Request, Timeout}, State) ->
+    on_query(InstId, {undefined, Method, Request, Timeout, 2}, State);
 on_query(
     InstId,
     {KeyOrNum, Method, Request, Timeout, Retry},
-    AfterQuery,
     #{pool_name := PoolName, base_path := BasePath} = State
 ) ->
     ?TRACE(
@@ -275,32 +273,29 @@ on_query(
     of
         {error, Reason} ->
             ?SLOG(error, #{
-                msg => "http_connector_do_reqeust_failed",
+                msg => "http_connector_do_request_failed",
                 request => NRequest,
                 reason => Reason,
                 connector => InstId
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            emqx_resource:query_success(AfterQuery);
+            ok;
         {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            emqx_resource:query_success(AfterQuery);
+            ok;
         {ok, StatusCode, _} ->
             ?SLOG(error, #{
                 msg => "http connector do request, received error response",
                 request => NRequest,
                 connector => InstId,
                 status_code => StatusCode
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         {ok, StatusCode, _, _} ->
             ?SLOG(error, #{
                 msg => "http connector do request, received error response",
                 request => NRequest,
                 connector => InstId,
                 status_code => StatusCode
-            }),
-            emqx_resource:query_failed(AfterQuery)
+            })
     end,
     Result.
 

+ 4 - 5
apps/emqx_connector/src/emqx_connector_ldap.erl

@@ -27,7 +27,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -99,7 +99,7 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
+on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = State) ->
     Request = {Base, Filter, Attributes},
     ?TRACE(
         "QUERY",
@@ -119,10 +119,9 @@ on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := P
                 request => Request,
                 connector => InstId,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         _ ->
-            emqx_resource:query_success(AfterQuery)
+            ok
     end,
     Result.
 

+ 3 - 7
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -27,7 +27,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -189,7 +189,6 @@ on_stop(InstId, #{poolname := PoolName}) ->
 on_query(
     InstId,
     {Action, Collection, Filter, Projector},
-    AfterQuery,
     #{poolname := PoolName} = State
 ) ->
     Request = {Action, Collection, Filter, Projector},
@@ -212,14 +211,11 @@ on_query(
                 reason => Reason,
                 connector => InstId
             }),
-            emqx_resource:query_failed(AfterQuery),
             {error, Reason};
         {ok, Cursor} when is_pid(Cursor) ->
-            emqx_resource:query_success(AfterQuery),
-            mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000);
+            {ok, mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)};
         Result ->
-            emqx_resource:query_success(AfterQuery),
-            Result
+            {ok, Result}
     end.
 
 -dialyzer({nowarn_function, [on_get_status/2]}).

+ 5 - 5
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -37,7 +37,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -181,12 +181,12 @@ on_stop(_InstId, #{name := InstanceId}) ->
             })
     end.
 
-on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) ->
-    emqx_resource:query_success(AfterQuery);
-on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
+on_query(_InstId, {message_received, _Msg}, _State) ->
+    ok;
+on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
     ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
     emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
-    emqx_resource:query_success(AfterQuery).
+    ok.
 
 on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
     AutoReconn = maps:get(auto_reconnect, Conf, true),

+ 6 - 11
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -26,7 +26,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -122,14 +122,13 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {TypeOrKey, SQLOrKey}, AfterQuery, State) ->
-    on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, AfterQuery, State);
-on_query(InstId, {TypeOrKey, SQLOrKey, Params}, AfterQuery, State) ->
-    on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, AfterQuery, State);
+on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
+    on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);
+on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) ->
+    on_query(InstId, {TypeOrKey, SQLOrKey, Params, default_timeout}, State);
 on_query(
     InstId,
     {TypeOrKey, SQLOrKey, Params, Timeout},
-    AfterQuery,
     #{poolname := PoolName, prepare_statement := Prepares} = State
 ) ->
     LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
@@ -147,7 +146,6 @@ on_query(
             ),
             %% kill the poll worker to trigger reconnection
             _ = exit(Conn, restart),
-            emqx_resource:query_failed(AfterQuery),
             Result;
         {error, not_prepared} ->
             ?SLOG(
@@ -157,13 +155,12 @@ on_query(
             case prepare_sql(Prepares, PoolName) of
                 ok ->
                     %% not return result, next loop will try again
-                    on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, AfterQuery, State);
+                    on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
                 {error, Reason} ->
                     ?SLOG(
                         error,
                         LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}
                     ),
-                    emqx_resource:query_failed(AfterQuery),
                     {error, Reason}
             end;
         {error, Reason} ->
@@ -171,10 +168,8 @@ on_query(
                 error,
                 LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
             ),
-            emqx_resource:query_failed(AfterQuery),
             Result;
         _ ->
-            emqx_resource:query_success(AfterQuery),
             Result
     end.
 

+ 6 - 7
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -29,7 +29,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -116,9 +116,9 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
-on_query(InstId, {Type, NameOrSQL}, AfterQuery, #{poolname := _PoolName} = State) ->
-    on_query(InstId, {Type, NameOrSQL, []}, AfterQuery, State);
-on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
+on_query(InstId, {Type, NameOrSQL}, #{poolname := _PoolName} = State) ->
+    on_query(InstId, {Type, NameOrSQL, []}, State);
+on_query(InstId, {Type, NameOrSQL, Params}, #{poolname := PoolName} = State) ->
     ?SLOG(debug, #{
         msg => "postgresql connector received sql query",
         connector => InstId,
@@ -132,10 +132,9 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName}
                 connector => InstId,
                 sql => NameOrSQL,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterQuery);
+            });
         _ ->
-            emqx_resource:query_success(AfterQuery)
+            ok
     end,
     Result.
 

+ 4 - 5
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -28,7 +28,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -177,7 +177,7 @@ on_stop(InstId, #{poolname := PoolName, type := Type}) ->
         _ -> emqx_plugin_libs_pool:stop_pool(PoolName)
     end.
 
-on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
+on_query(InstId, {cmd, Command}, #{poolname := PoolName, type := Type} = State) ->
     ?TRACE(
         "QUERY",
         "redis_connector_received",
@@ -195,10 +195,9 @@ on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := T
                 connector => InstId,
                 sql => Command,
                 reason => Reason
-            }),
-            emqx_resource:query_failed(AfterCommand);
+            });
         _ ->
-            emqx_resource:query_success(AfterCommand)
+            ok
     end,
     Result.
 

+ 6 - 8
apps/emqx_resource/include/emqx_resource.hrl

@@ -21,7 +21,7 @@
 -type resource_config() :: term().
 -type resource_spec() :: map().
 -type resource_state() :: term().
--type resource_status() :: connected | disconnected | connecting.
+-type resource_status() :: connected | disconnected | connecting | stopped.
 -type resource_data() :: #{
     id := resource_id(),
     mod := module(),
@@ -45,13 +45,11 @@
     %% periodically.
     auto_retry_interval => integer()
 }.
--type after_query() ::
-    {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]}
-    | undefined.
-
-%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
-%% actions upon query failure
--type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
+-type query_result() ::
+    ok
+    | {ok, term()}
+    | {error, term()}
+    | {resource_down, term()}.
 
 -define(TEST_ID_PREFIX, "_test_:").
 -define(RES_METRICS, resource_metrics).

+ 6 - 26
apps/emqx_resource/src/emqx_resource.erl

@@ -23,13 +23,6 @@
 
 -export([list_types/0]).
 
-%% APIs for behaviour implementations
-
--export([
-    query_success/1,
-    query_failed/1
-]).
-
 %% APIs for instances
 
 -export([
@@ -113,7 +106,8 @@
 -export([inc_metrics_funcs/1, inc_success/1, inc_failed/1]).
 
 -optional_callbacks([
-    on_query/4,
+    on_query/3,
+    on_batch_query/3,
     on_get_status/2
 ]).
 
@@ -125,7 +119,9 @@
 -callback on_stop(resource_id(), resource_state()) -> term().
 
 %% when calling emqx_resource:query/3
--callback on_query(resource_id(), Request :: term(), after_query(), resource_state()) -> term().
+-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
+
+-callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
 
 %% when calling emqx_resource:health_check/2
 -callback on_get_status(resource_id(), resource_state()) ->
@@ -149,22 +145,6 @@ is_resource_mod(Module) ->
             proplists:get_value(behaviour, Info, []),
     lists:member(?MODULE, Behaviour).
 
--spec query_success(after_query()) -> ok.
-query_success(undefined) -> ok;
-query_success({OnSucc, _}) -> exec_query_after_calls(OnSucc).
-
--spec query_failed(after_query()) -> ok.
-query_failed(undefined) -> ok;
-query_failed({_, OnFailed}) -> exec_query_after_calls(OnFailed).
-
-exec_query_after_calls(Funcs) ->
-    lists:foreach(
-        fun({Fun, Arg}) ->
-            emqx_resource_utils:safe_exec(Fun, Arg)
-        end,
-        Funcs
-    ).
-
 %% =================================================================================
 %% APIs for resource instances
 %% =================================================================================
@@ -247,7 +227,7 @@ query(ResId, Request) ->
     emqx_resource_worker:query(ResId, Request).
 
 -spec query_async(resource_id(), Request :: term(), emqx_resource_worker:reply_fun()) ->
-    ok.
+    Result :: term().
 query_async(ResId, Request, ReplyFun) ->
     emqx_resource_worker:query_async(ResId, Request, ReplyFun).
 

+ 9 - 6
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -53,7 +53,7 @@
 
 -define(SHORT_HEALTHCHECK_INTERVAL, 1000).
 -define(HEALTHCHECK_INTERVAL, 15000).
--define(ETS_TABLE, emqx_resource_manager).
+-define(ETS_TABLE, ?MODULE).
 -define(WAIT_FOR_RESOURCE_DELAY, 100).
 -define(T_OPERATION, 5000).
 -define(T_LOOKUP, 1000).
@@ -114,9 +114,9 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
         [matched, success, failed, exception, resource_down],
         [matched]
     ),
+    ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
     case maps:get(start_after_created, Opts, true) of
         true ->
-            ok = emqx_resource_sup:start_workers(ResId, Opts),
             wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
         false ->
             ok
@@ -317,7 +317,7 @@ handle_event({call, From}, health_check, _State, Data) ->
     handle_manually_health_check(From, Data);
 % State: CONNECTING
 handle_event(enter, _OldState, connecting, Data) ->
-    UpdatedData = Data#data{status = connected},
+    UpdatedData = Data#data{status = connecting},
     insert_cache(Data#data.id, Data#data.group, Data),
     Actions = [{state_timeout, 0, health_check}],
     {keep_state, UpdatedData, Actions};
@@ -332,7 +332,7 @@ handle_event(enter, _OldState, connected, Data) ->
     UpdatedData = Data#data{status = connected},
     insert_cache(Data#data.id, Data#data.group, UpdatedData),
     _ = emqx_alarm:deactivate(Data#data.id),
-    Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
+    Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
     {next_state, connected, UpdatedData, Actions};
 handle_event(state_timeout, health_check, connected, Data) ->
     handle_connected_health_check(Data);
@@ -423,7 +423,7 @@ handle_remove_event(From, ClearMetrics, Data) ->
         true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
         false -> ok
     end,
-    ok = emqx_resource_sup:stop_workers(Data#data.id, Data#data.opts),
+    ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts),
     {stop_and_reply, normal, [{reply, From, ok}]}.
 
 start_resource(Data, From) ->
@@ -487,7 +487,7 @@ handle_connected_health_check(Data) ->
         Data,
         fun
             (connected, UpdatedData) ->
-                Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
+                Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
                 {keep_state, UpdatedData, Actions};
             (Status, UpdatedData) ->
                 ?SLOG(error, #{
@@ -510,6 +510,9 @@ with_health_check(Data, Func) ->
     insert_cache(ResId, UpdatedData#data.group, UpdatedData),
     Func(Status, UpdatedData).
 
+health_check_interval(Opts) ->
+    maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
+
 maybe_alarm(connected, _ResId) ->
     ok;
 maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>) ->

+ 8 - 77
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -19,7 +19,7 @@
 
 -behaviour(supervisor).
 
--export([start_link/0, start_workers/2, stop_workers/2]).
+-export([start_link/0]).
 
 -export([init/1]).
 
@@ -29,7 +29,6 @@ start_link() ->
 init([]) ->
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
     Metrics = emqx_metrics_worker:child_spec(?RES_METRICS),
-
     ResourceManager =
         #{
             id => emqx_resource_manager_sup,
@@ -39,79 +38,11 @@ init([]) ->
             type => supervisor,
             modules => [emqx_resource_manager_sup]
         },
-    {ok, {SupFlags, [Metrics, ResourceManager]}}.
-
-start_workers(ResId, Opts) ->
-    PoolSize = pool_size(Opts),
-    _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]),
-    lists:foreach(
-        fun(Idx) ->
-            _ = ensure_worker_added(ResId, {ResId, Idx}, Idx),
-            ok = ensure_worker_started(ResId, Idx, Opts)
-        end,
-        lists:seq(1, PoolSize)
-    ).
-
-stop_workers(ResId, Opts) ->
-    PoolSize = pool_size(Opts),
-    lists:foreach(
-        fun(Idx) ->
-            ok = ensure_worker_stopped(ResId, Idx),
-            ok = ensure_worker_removed(ResId, {ResId, Idx})
-        end,
-        lists:seq(1, PoolSize)
-    ),
-    _ = gproc_pool:delete(ResId),
-    ok.
-
-pool_size(Opts) ->
-    maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)).
-
-ensure_worker_pool(Pool, Type, Opts) ->
-    try
-        gproc_pool:new(Pool, Type, Opts)
-    catch
-        error:exists -> ok
-    end,
-    ok.
-
-ensure_worker_added(Pool, Name, Slot) ->
-    try
-        gproc_pool:add_worker(Pool, Name, Slot)
-    catch
-        error:exists -> ok
-    end,
-    ok.
-
-ensure_worker_removed(Pool, Name) ->
-    _ = gproc_pool:remove_worker(Pool, Name),
-    ok.
-
--define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}).
-ensure_worker_started(ResId, Idx, Opts) ->
-    Mod = emqx_resource_worker,
-    Spec = #{
-        id => ?CHILD_ID(Mod, ResId, Idx),
-        start => {Mod, start_link, [ResId, Idx, Opts]},
-        restart => transient,
-        shutdown => 5000,
-        type => worker,
-        modules => [Mod]
+    WorkerSup = #{
+        id => emqx_resource_worker_sup,
+        start => {emqx_resource_worker_sup, start_link, []},
+        restart => permanent,
+        shutdown => infinity,
+        type => supervisor
     },
-    case supervisor:start_child(emqx_resource_sup, Spec) of
-        {ok, _Pid} -> ok;
-        {error, {already_started, _}} -> ok;
-        {error, already_present} -> ok;
-        {error, _} = Err -> Err
-    end.
-
-ensure_worker_stopped(ResId, Idx) ->
-    ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx),
-    case supervisor:terminate_child(emqx_resource_sup, ChildId) of
-        ok ->
-            supervisor:delete_child(emqx_resource_sup, ChildId);
-        {error, not_found} ->
-            ok;
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, {SupFlags, [Metrics, ResourceManager, WorkerSup]}}.

+ 77 - 55
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -44,6 +44,8 @@
 
 -export([running/3, blocked/3]).
 
+-export([queue_item_marshaller/1]).
+
 -define(RESUME_INTERVAL, 15000).
 
 %% count
@@ -51,11 +53,15 @@
 %% milliseconds
 -define(DEFAULT_BATCH_TIME, 10).
 
+-define(Q_ITEM(REQUEST), {q_item, REQUEST}).
+
 -define(QUERY(FROM, REQUEST), {FROM, REQUEST}).
 -define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}).
 -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]).
 
--define(RESOURCE_ERROR(Reason, Msg), {error, {resource_error, #{reason => Reason, msg => Msg}}}).
+-define(RESOURCE_ERROR(Reason, Msg),
+    {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
+).
 -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
 
 -type id() :: binary().
@@ -72,21 +78,21 @@ callback_mode() -> [state_functions, state_enter].
 start_link(Id, Index, Opts) ->
     gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
 
--spec query(id(), request()) -> ok.
+-spec query(id(), request()) -> Result :: term().
 query(Id, Request) ->
-    gen_statem:call(pick(Id, self()), {query, Request}).
+    query(Id, self(), Request).
 
--spec query(id(), term(), request()) -> ok.
-query(Id, Key, Request) ->
-    gen_statem:call(pick(Id, Key), {query, Request}).
+-spec query(id(), term(), request()) -> Result :: term().
+query(Id, PickKey, Request) ->
+    pick_query(call, Id, PickKey, {query, Request}).
 
--spec query_async(id(), request(), reply_fun()) -> ok.
+-spec query_async(id(), request(), reply_fun()) -> Result :: term().
 query_async(Id, Request, ReplyFun) ->
-    gen_statem:cast(pick(Id, self()), {query, Request, ReplyFun}).
+    query_async(Id, self(), Request, ReplyFun).
 
--spec query_async(id(), term(), request(), reply_fun()) -> ok.
-query_async(Id, Key, Request, ReplyFun) ->
-    gen_statem:cast(pick(Id, Key), {query, Request, ReplyFun}).
+-spec query_async(id(), term(), request(), reply_fun()) -> Result :: term().
+query_async(Id, PickKey, Request, ReplyFun) ->
+    pick_query(cast, Id, PickKey, {query, Request, ReplyFun}).
 
 -spec block(pid() | atom()) -> ok.
 block(ServerRef) ->
@@ -97,17 +103,24 @@ resume(ServerRef) ->
     gen_statem:cast(ServerRef, resume).
 
 init({Id, Index, Opts}) ->
+    process_flag(trap_exit, true),
     true = gproc_pool:connect_worker(Id, {Id, Index}),
     BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
     Queue =
         case maps:get(queue_enabled, Opts, true) of
-            true -> replayq:open(#{dir => disk_queue_dir(Id, Index), seg_bytes => 10000000});
-            false -> undefined
+            true ->
+                replayq:open(#{
+                    dir => disk_queue_dir(Id, Index),
+                    seg_bytes => 10000000,
+                    marshaller => fun ?MODULE:queue_item_marshaller/1
+                });
+            false ->
+                undefined
         end,
     St = #{
         id => Id,
         index => Index,
-        batch_enabled => maps:get(batch_enabled, Opts, true),
+        batch_enabled => maps:get(batch_enabled, Opts, false),
         batch_size => BatchSize,
         batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
         queue => Queue,
@@ -128,7 +141,7 @@ running(cast, {query, Request, ReplyFun}, St) ->
 running({call, From}, {query, Request}, St) ->
     query_or_acc(From, Request, St);
 running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
-    {keep_state, flush(St#{tref := undefined})};
+    flush(St#{tref := undefined});
 running(info, {flush, _Ref}, _St) ->
     keep_state_and_data;
 running(info, Info, _St) ->
@@ -154,12 +167,21 @@ terminate(_Reason, #{id := Id, index := Index}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+queue_item_marshaller(?Q_ITEM(_) = I) ->
+    term_to_binary(I);
+queue_item_marshaller(Bin) when is_binary(Bin) ->
+    binary_to_term(Bin).
+
 %%==============================================================================
-pick(Id, Key) ->
-    Pid = gproc_pool:pick_worker(Id, Key),
-    case is_pid(Pid) of
-        true -> Pid;
-        false -> error({failed_to_pick_worker, {Id, Key}})
+pick_query(Fun, Id, Key, Query) ->
+    try gproc_pool:pick_worker(Id, Key) of
+        Pid when is_pid(Pid) ->
+            gen_statem:Fun(Pid, Query);
+        _ ->
+            ?RESOURCE_ERROR(not_created, "resource not found")
+    catch
+        error:badarg ->
+            ?RESOURCE_ERROR(not_created, "resource not found")
     end.
 
 do_resume(#{queue := undefined} = St) ->
@@ -168,9 +190,9 @@ do_resume(#{queue := Q, id := Id} = St) ->
     case replayq:peek(Q) of
         empty ->
             {next_state, running, St};
-        First ->
+        ?Q_ITEM(First) ->
             Result = call_query(Id, First),
-            case handle_query_result(Id, false, Result) of
+            case handle_query_result(Id, Result, false) of
                 %% Send failed because resource down
                 true ->
                     {keep_state, St, {state_timeout, ?RESUME_INTERVAL, resume}};
@@ -182,6 +204,11 @@ do_resume(#{queue := Q, id := Id} = St) ->
             end
     end.
 
+handle_blocked(From, Request, #{id := Id, queue := Q} = St) ->
+    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
+    _ = reply_caller(Id, ?REPLY(From, Request, Error), false),
+    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}.
+
 drop_head(Q) ->
     {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
     ok = replayq:ack(Q1, AckRef),
@@ -196,26 +223,21 @@ acc_query(From, Request, #{acc := Acc, acc_left := Left} = St0) ->
     Acc1 = [?QUERY(From, Request) | Acc],
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
-        true -> {keep_state, flush(St)};
+        true -> flush(St);
         false -> {keep_state, ensure_flush_timer(St)}
     end.
 
 send_query(From, Request, #{id := Id, queue := Q} = St) ->
     Result = call_query(Id, Request),
-    case reply_caller(Id, Q, ?REPLY(From, Request, Result)) of
+    case reply_caller(Id, ?REPLY(From, Request, Result), false) of
         true ->
-            {keep_state, St#{queue := maybe_append_queue(Q, [Request])}};
+            {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}};
         false ->
-            {next_state, blocked, St}
+            {keep_state, St}
     end.
 
-handle_blocked(From, Request, #{id := Id, queue := Q} = St) ->
-    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
-    _ = reply_caller(Id, Q, ?REPLY(From, Request, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Q, [Request])}}.
-
 flush(#{acc := []} = St) ->
-    St;
+    {keep_state, St};
 flush(
     #{
         id := Id,
@@ -228,65 +250,65 @@ flush(
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, BatchResults) of
         true ->
-            Q1 = maybe_append_queue(Q0, [Request || ?QUERY(_, Request) <- Batch]),
-            {keep_state, St1#{queue := Q1}};
+            Q1 = maybe_append_queue(Q0, [?Q_ITEM(Request) || ?QUERY(_, Request) <- Batch]),
+            {next_state, blocked, St1#{queue := Q1}};
         false ->
-            {next_state, blocked, St1}
+            {keep_state, St1}
     end.
 
-maybe_append_queue(undefined, _Query) -> undefined;
-maybe_append_queue(Q, Query) -> replayq:append(Q, Query).
+maybe_append_queue(undefined, _Request) -> undefined;
+maybe_append_queue(Q, Request) -> replayq:append(Q, Request).
 
 batch_reply_caller(Id, BatchResults) ->
     lists:foldl(
         fun(Reply, BlockWorker) ->
-            reply_caller(Id, BlockWorker, Reply)
+            reply_caller(Id, Reply, BlockWorker)
         end,
         false,
         BatchResults
     ).
 
-reply_caller(Id, BlockWorker, ?REPLY(undefined, _, Result)) ->
-    handle_query_result(Id, BlockWorker, Result);
-reply_caller(Id, BlockWorker, ?REPLY({ReplyFun, Args}, _, Result)) ->
+reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) ->
+    handle_query_result(Id, Result, BlockWorker);
+reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) ->
     ?SAFE_CALL(ReplyFun(Result, Args)),
-    handle_query_result(Id, BlockWorker, Result);
-reply_caller(Id, BlockWorker, ?REPLY(From, _, Result)) ->
+    handle_query_result(Id, Result, BlockWorker);
+reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
     gen_statem:reply(From, Result),
-    handle_query_result(Id, BlockWorker, Result).
+    handle_query_result(Id, Result, BlockWorker).
 
-handle_query_result(Id, BlockWorker, ok) ->
+handle_query_result(Id, ok, BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, success),
     BlockWorker;
-handle_query_result(Id, BlockWorker, {ok, _}) ->
+handle_query_result(Id, {ok, _}, BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, success),
     BlockWorker;
-handle_query_result(Id, BlockWorker, ?RESOURCE_ERROR_M(exception, _)) ->
+handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
     BlockWorker;
-handle_query_result(_Id, _, ?RESOURCE_ERROR_M(NotWorking, _)) when
+handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
     NotWorking == not_connected; NotWorking == blocked
 ->
     true;
-handle_query_result(_Id, BlockWorker, ?RESOURCE_ERROR_M(_, _)) ->
+handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) ->
     BlockWorker;
-handle_query_result(Id, BlockWorker, {error, _}) ->
+handle_query_result(Id, {error, _}, BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
     BlockWorker;
-handle_query_result(Id, _BlockWorker, {resource_down, _}) ->
+handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
     true.
 
 call_query(Id, Request) ->
-    do_call_query(on_query, Id, Request).
+    do_call_query(on_query, Id, Request, 1).
 
 call_batch_query(Id, Batch) ->
-    do_call_query(on_batch_query, Id, Batch).
+    do_call_query(on_batch_query, Id, Batch, length(Batch)).
 
-do_call_query(Fun, Id, Data) ->
+do_call_query(Fun, Id, Data, Count) ->
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
-            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Data)),
+            ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, Count),
             try Mod:Fun(Id, Data, ResourceState) of
                 %% if the callback module (connector) wants to return an error that
                 %% makes the current resource goes into the `error` state, it should

+ 136 - 0
apps/emqx_resource/src/emqx_resource_worker_sup.erl

@@ -0,0 +1,136 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_resource_worker_sup).
+-behaviour(supervisor).
+
+%%%=============================================================================
+%%% Exports and Definitions
+%%%=============================================================================
+
+%% External API
+-export([start_link/0]).
+
+-export([start_workers/2, stop_workers/2]).
+
+%% Callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%%=============================================================================
+%%% API
+%%%=============================================================================
+
+-spec start_link() -> supervisor:startlink_ret().
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%%=============================================================================
+%%% Callbacks
+%%%=============================================================================
+
+-spec init(list()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}} | ignore.
+init([]) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 100,
+        period => 30
+    },
+    ChildSpecs = [],
+    {ok, {SupFlags, ChildSpecs}}.
+
+start_workers(ResId, Opts) ->
+    PoolSize = pool_size(Opts),
+    _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]),
+    lists:foreach(
+        fun(Idx) ->
+            _ = ensure_worker_added(ResId, Idx),
+            ok = ensure_worker_started(ResId, Idx, Opts)
+        end,
+        lists:seq(1, PoolSize)
+    ).
+
+stop_workers(ResId, Opts) ->
+    PoolSize = pool_size(Opts),
+    lists:foreach(
+        fun(Idx) ->
+            ensure_worker_removed(ResId, Idx)
+        end,
+        lists:seq(1, PoolSize)
+    ),
+    ensure_worker_pool_removed(ResId),
+    ok.
+
+%%%=============================================================================
+%%% Internal
+%%%=============================================================================
+pool_size(Opts) ->
+    maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)).
+
+ensure_worker_pool(ResId, Type, Opts) ->
+    try
+        gproc_pool:new(ResId, Type, Opts)
+    catch
+        error:exists -> ok
+    end,
+    ok.
+
+ensure_worker_added(ResId, Idx) ->
+    try
+        gproc_pool:add_worker(ResId, {ResId, Idx}, Idx)
+    catch
+        error:exists -> ok
+    end,
+    ok.
+
+-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}).
+ensure_worker_started(ResId, Idx, Opts) ->
+    Mod = emqx_resource_worker,
+    Spec = #{
+        id => ?CHILD_ID(Mod, ResId, Idx),
+        start => {Mod, start_link, [ResId, Idx, Opts]},
+        restart => transient,
+        shutdown => 5000,
+        type => worker,
+        modules => [Mod]
+    },
+    case supervisor:start_child(emqx_resource_sup, Spec) of
+        {ok, _Pid} -> ok;
+        {error, {already_started, _}} -> ok;
+        {error, already_present} -> ok;
+        {error, _} = Err -> Err
+    end.
+
+ensure_worker_removed(ResId, Idx) ->
+    ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx),
+    case supervisor:terminate_child(emqx_resource_sup, ChildId) of
+        ok ->
+            Res = supervisor:delete_child(emqx_resource_sup, ChildId),
+            _ = gproc_pool:remove_worker(ResId, {ResId, Idx}),
+            Res;
+        {error, not_found} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+ensure_worker_pool_removed(ResId) ->
+    try
+        gproc_pool:delete(ResId)
+    catch
+        error:badarg -> ok
+    end,
+    ok.

+ 33 - 44
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -26,6 +26,7 @@
 -define(TEST_RESOURCE, emqx_test_resource).
 -define(ID, <<"id">>).
 -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
+-define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -80,7 +81,7 @@ t_create_remove(_) ->
         #{name => test_resource},
         #{}
     ),
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid)),
 
@@ -110,7 +111,7 @@ t_create_remove_local(_) ->
         #{name => test_resource},
         #{}
     ),
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid)),
 
@@ -127,7 +128,7 @@ t_create_remove_local(_) ->
     {error, _} = emqx_resource:remove_local(?ID),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_found}}},
+        ?RESOURCE_ERROR(not_created),
         emqx_resource:query(?ID, get_state)
     ),
     ?assertNot(is_process_alive(Pid)).
@@ -143,23 +144,23 @@ t_do_not_start_after_created(_) ->
     %% the resource should remain `disconnected` after created
     timer:sleep(200),
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_connected}}},
+        ?RESOURCE_ERROR(stopped),
         emqx_resource:query(?ID, get_state)
     ),
     ?assertMatch(
-        {ok, _, #{status := disconnected}},
+        {ok, _, #{status := stopped}},
         emqx_resource:get_instance(?ID)
     ),
 
     %% start the resource manually..
     ok = emqx_resource:start(?ID),
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     ?assert(is_process_alive(Pid)),
 
     %% restart the resource
     ok = emqx_resource:restart(?ID),
     ?assertNot(is_process_alive(Pid)),
-    #{pid := Pid2} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
     ?assert(is_process_alive(Pid2)),
 
     ok = emqx_resource:remove_local(?ID),
@@ -174,23 +175,10 @@ t_query(_) ->
         #{name => test_resource}
     ),
 
-    Pid = self(),
-    Success = fun() -> Pid ! success end,
-    Failure = fun() -> Pid ! failure end,
-
-    #{pid := _} = emqx_resource:query(?ID, get_state),
-    #{pid := _} = emqx_resource:query(?ID, get_state, {[{Success, []}], [{Failure, []}]}),
-    #{pid := _} = emqx_resource:query(?ID, get_state, undefined),
-    #{pid := _} = emqx_resource:query(?ID, get_state_failed, undefined),
-
-    receive
-        Message -> ?assertEqual(success, Message)
-    after 100 ->
-        ?assert(false)
-    end,
+    {ok, #{pid := _}} = emqx_resource:query(?ID, get_state),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_found}}},
+        ?RESOURCE_ERROR(not_created),
         emqx_resource:query(<<"unknown">>, get_state)
     ),
 
@@ -201,11 +189,14 @@ t_healthy_timeout(_) ->
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
-        #{name => <<"test_resource">>},
-        #{health_check_timeout => 200}
+        #{name => <<"bad_not_atom_name">>, register => true},
+        %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
+        #{health_check_interval => 200}
+    ),
+    ?assertMatch(
+        ?RESOURCE_ERROR(not_connected),
+        emqx_resource:query(?ID, get_state)
     ),
-    timer:sleep(500),
-
     ok = emqx_resource:remove_local(?ID).
 
 t_healthy(_) ->
@@ -213,11 +204,9 @@ t_healthy(_) ->
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
-        #{name => <<"test_resource">>}
+        #{name => test_resource}
     ),
-    timer:sleep(400),
-
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     timer:sleep(300),
     emqx_resource:set_resource_status_connecting(?ID),
 
@@ -229,10 +218,10 @@ t_healthy(_) ->
 
     erlang:exit(Pid, shutdown),
 
-    ?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)),
+    ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)),
 
     ?assertMatch(
-        [#{status := connecting}],
+        [#{status := disconnected}],
         emqx_resource:list_instances_verbose()
     ),
 
@@ -260,7 +249,7 @@ t_stop_start(_) ->
         #{}
     ),
 
-    #{pid := Pid0} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid0)),
 
@@ -269,14 +258,14 @@ t_stop_start(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_connected}}},
+        ?RESOURCE_ERROR(stopped),
         emqx_resource:query(?ID, get_state)
     ),
 
     ok = emqx_resource:restart(?ID),
     timer:sleep(300),
 
-    #{pid := Pid1} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid1)).
 
@@ -302,7 +291,7 @@ t_stop_start_local(_) ->
         #{}
     ),
 
-    #{pid := Pid0} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid0)),
 
@@ -311,13 +300,13 @@ t_stop_start_local(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_connected}}},
+        ?RESOURCE_ERROR(stopped),
         emqx_resource:query(?ID, get_state)
     ),
 
     ok = emqx_resource:restart(?ID),
 
-    #{pid := Pid1} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
 
     ?assert(is_process_alive(Pid1)).
 
@@ -368,17 +357,17 @@ create_dry_run_local_succ() ->
     ?assertEqual(undefined, whereis(test_resource)).
 
 t_create_dry_run_local_failed(_) ->
-    {Res1, _} = emqx_resource:create_dry_run_local(
+    Res1 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
-        #{cteate_error => true}
+        #{create_error => true}
     ),
-    ?assertEqual(error, Res1),
+    ?assertMatch({error, _}, Res1),
 
-    {Res2, _} = emqx_resource:create_dry_run_local(
+    Res2 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{name => test_resource, health_check_error => true}
     ),
-    ?assertEqual(error, Res2),
+    ?assertMatch({error, _}, Res2),
 
     Res3 = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
@@ -400,7 +389,7 @@ t_reset_metrics(_) ->
         #{name => test_resource}
     ),
 
-    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     emqx_resource:reset_metrics(?ID),
     ?assert(is_process_alive(Pid)),
     ok = emqx_resource:remove(?ID),

+ 11 - 16
apps/emqx_resource/test/emqx_test_resource.erl

@@ -24,7 +24,7 @@
 -export([
     on_start/2,
     on_stop/2,
-    on_query/4,
+    on_query/3,
     on_get_status/2
 ]).
 
@@ -50,24 +50,20 @@ on_start(_InstId, #{create_error := true}) ->
     error("some error");
 on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
     Register = maps:get(register, Opts, false),
-    {ok, #{
-        name => Name,
+    {ok, Opts#{
         id => InstId,
         stop_error => true,
         pid => spawn_dummy_process(Name, Register)
     }};
-on_start(InstId, #{name := Name, health_check_error := true} = Opts) ->
+on_start(InstId, #{name := Name} = Opts) ->
     Register = maps:get(register, Opts, false),
-    {ok, #{
-        name => Name,
+    {ok, Opts#{
         id => InstId,
-        health_check_error => true,
         pid => spawn_dummy_process(Name, Register)
     }};
 on_start(InstId, #{name := Name} = Opts) ->
     Register = maps:get(register, Opts, false),
-    {ok, #{
-        name => Name,
+    {ok, Opts#{
         id => InstId,
         pid => spawn_dummy_process(Name, Register)
     }}.
@@ -78,12 +74,10 @@ on_stop(_InstId, #{pid := Pid}) ->
     erlang:exit(Pid, shutdown),
     ok.
 
-on_query(_InstId, get_state, AfterQuery, State) ->
-    emqx_resource:query_success(AfterQuery),
-    State;
-on_query(_InstId, get_state_failed, AfterQuery, State) ->
-    emqx_resource:query_failed(AfterQuery),
-    State.
+on_query(_InstId, get_state, State) ->
+    {ok, State};
+on_query(_InstId, get_state_failed, State) ->
+    {error, State}.
 
 on_get_status(_InstId, #{health_check_error := true}) ->
     disconnected;
@@ -91,10 +85,11 @@ on_get_status(_InstId, #{pid := Pid}) ->
     timer:sleep(300),
     case is_process_alive(Pid) of
         true -> connected;
-        false -> connecting
+        false -> disconnected
     end.
 
 spawn_dummy_process(Name, Register) ->
+    ct:pal("---- Register Name: ~p", [Name]),
     spawn(
         fun() ->
             true =