Просмотр исходного кода

Revert "feat: isolate resource manager processes"

This reverts commit 40cca58d4fd3c52c7118c6d23819f3ccf125f420.
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
f42a5b90df

+ 23 - 13
apps/emqx_resource/src/emqx_resource.erl

@@ -156,7 +156,7 @@ create_local(InstId, Group, ResourceType, Config) ->
                    create_opts()) ->
                    create_opts()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create_local(InstId, Group, ResourceType, Config, Opts) ->
 create_local(InstId, Group, ResourceType, Config, Opts) ->
-    emqx_resource_manager:ensure_resource(InstId, Group, ResourceType, Config, Opts).
+    call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}).
 
 
 -spec create_dry_run(resource_type(), resource_config()) ->
 -spec create_dry_run(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
     ok | {error, Reason :: term()}.
@@ -166,7 +166,8 @@ create_dry_run(ResourceType, Config) ->
 -spec create_dry_run_local(resource_type(), resource_config()) ->
 -spec create_dry_run_local(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
     ok | {error, Reason :: term()}.
 create_dry_run_local(ResourceType, Config) ->
 create_dry_run_local(ResourceType, Config) ->
-    emqx_resource_manager:create_dry_run(ResourceType, Config).
+    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
+    call_instance(RandId, {create_dry_run, ResourceType, Config}).
 
 
 -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
 -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
     {ok, resource_data()} | {error, Reason :: term()}.
@@ -176,7 +177,7 @@ recreate(InstId, ResourceType, Config, Opts) ->
 -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
 -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
     {ok, resource_data()} | {error, Reason :: term()}.
 recreate_local(InstId, ResourceType, Config, Opts) ->
 recreate_local(InstId, ResourceType, Config, Opts) ->
-    emqx_resource_manager:recreate(InstId, ResourceType, Config, Opts).
+    call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}).
 
 
 -spec remove(instance_id()) -> ok | {error, Reason :: term()}.
 -spec remove(instance_id()) -> ok | {error, Reason :: term()}.
 remove(InstId) ->
 remove(InstId) ->
@@ -184,11 +185,11 @@ remove(InstId) ->
 
 
 -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
 -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
 remove_local(InstId) ->
 remove_local(InstId) ->
-    emqx_resource_manager:remove(InstId).
+    call_instance(InstId, {remove, InstId}).
 
 
 -spec reset_metrics_local(instance_id()) -> ok.
 -spec reset_metrics_local(instance_id()) -> ok.
 reset_metrics_local(InstId) ->
 reset_metrics_local(InstId) ->
-    emqx_resource_manager:reset_metrics(InstId).
+    call_instance(InstId, {reset_metrics, InstId}).
 
 
 -spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}.
 -spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}.
 reset_metrics(InstId) ->
 reset_metrics(InstId) ->
@@ -203,7 +204,13 @@ query(InstId, Request) ->
 %% it is the duty of the Module to apply the `after_query()` functions.
 %% it is the duty of the Module to apply the `after_query()` functions.
 -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
 -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
 query(InstId, Request, AfterQuery) ->
 query(InstId, Request, AfterQuery) ->
-    case emqx_resource_manager:ets_lookup(InstId) of
+    case get_instance(InstId) of
+        {ok, _Group, #{status := connecting}} ->
+            query_error(connecting, <<"cannot serve query when the resource "
+                "instance is still connecting">>);
+        {ok, _Group, #{status := disconnected}} ->
+            query_error(disconnected, <<"cannot serve query when the resource "
+                "instance is disconnected">>);
         {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
         {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
             %% the resource state is readonly to Module:on_query/4
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe
             %% and the `after_query()` functions should be thread safe
@@ -223,23 +230,23 @@ restart(InstId) ->
 
 
 -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
 -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
 restart(InstId, Opts) ->
 restart(InstId, Opts) ->
-    emqx_resource_manager:restart(InstId, Opts).
+    call_instance(InstId, {restart, InstId, Opts}).
 
 
 -spec stop(instance_id()) -> ok | {error, Reason :: term()}.
 -spec stop(instance_id()) -> ok | {error, Reason :: term()}.
 stop(InstId) ->
 stop(InstId) ->
-    emqx_resource_manager:stop(InstId).
+    call_instance(InstId, {stop, InstId}).
 
 
 -spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
 -spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
 health_check(InstId) ->
 health_check(InstId) ->
-    emqx_resource_manager:health_check(InstId).
+    call_instance(InstId, {health_check, InstId}).
 
 
 set_resource_status_connecting(InstId) ->
 set_resource_status_connecting(InstId) ->
-    emqx_resource_manager:set_resource_status_connecting(InstId).
+    call_instance(InstId, {set_resource_status_connecting, InstId}).
 
 
 -spec get_instance(instance_id()) ->
 -spec get_instance(instance_id()) ->
     {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
     {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
 get_instance(InstId) ->
 get_instance(InstId) ->
-    emqx_resource_manager:lookup(InstId).
+    emqx_resource_instance:lookup(InstId).
 
 
 -spec list_instances() -> [instance_id()].
 -spec list_instances() -> [instance_id()].
 list_instances() ->
 list_instances() ->
@@ -247,7 +254,7 @@ list_instances() ->
 
 
 -spec list_instances_verbose() -> [resource_data()].
 -spec list_instances_verbose() -> [resource_data()].
 list_instances_verbose() ->
 list_instances_verbose() ->
-    emqx_resource_manager:list_all().
+    emqx_resource_instance:list_all().
 
 
 -spec list_instances_by_type(module()) -> [instance_id()].
 -spec list_instances_by_type(module()) -> [instance_id()].
 list_instances_by_type(ResourceType) ->
 list_instances_by_type(ResourceType) ->
@@ -261,7 +268,7 @@ generate_id(Name) when is_binary(Name) ->
     <<Name/binary, ":", Id/binary>>.
     <<Name/binary, ":", Id/binary>>.
 
 
 -spec list_group_instances(resource_group()) -> [instance_id()].
 -spec list_group_instances(resource_group()) -> [instance_id()].
-list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
+list_group_instances(Group) -> emqx_resource_instance:list_group(Group).
 
 
 -spec call_start(instance_id(), module(), resource_config()) ->
 -spec call_start(instance_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
     {ok, resource_state()} | {error, Reason :: term()}.
@@ -352,6 +359,9 @@ inc_metrics_funcs(InstId) ->
              ],
              ],
     {OnSucc, OnFailed}.
     {OnSucc, OnFailed}.
 
 
+call_instance(InstId, Query) ->
+    emqx_resource_instance:hash_call(InstId, Query).
+
 safe_apply(Func, Args) ->
 safe_apply(Func, Args) ->
     ?SAFE_CALL(erlang:apply(Func, Args)).
     ?SAFE_CALL(erlang:apply(Func, Args)).
 
 

+ 0 - 416
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -1,416 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_resource_manager).
--behaviour(gen_statem).
-
--include("emqx_resource.hrl").
--include("emqx_resource_utils.hrl").
-
-% API
--export([
-    ensure_resource/5,
-    create_dry_run/2,
-    ets_lookup/1,
-    get_metrics/1,
-    health_check/1,
-    list_all/0,
-    list_group/1,
-    lookup/1,
-    recreate/4,
-    remove/1,
-    reset_metrics/1,
-    restart/2,
-    set_resource_status_connecting/1,
-    stop/1
-]).
-
-% Server
--export([start_link/5]).
-
-% Behaviour
--export([init/1, callback_mode/0, handle_event/4, terminate/3]).
-
-% State record
--record(data, {id, group, mod, config, opts, status, state, error}).
-
--define(SHORT_HEALTHCHECK_INTERVAL, 1000).
--define(HEALTHCHECK_INTERVAL, 15000).
--define(ETS_TABLE, emqx_resource_manager).
--define(TIME_DIVISOR, 100).
--define(WAIT_FOR_RESOURCE_DELAY, 100).
-
-%%------------------------------------------------------------------------------
-%% API
-%%------------------------------------------------------------------------------
-
-%% @doc Called from emqx_resource when starting a resource instance.
-%%
-%% Triggers the emqx_resource_manager_sup supervisor to actually create
-%% and link the process itself if not already started.
--spec ensure_resource(
-    instance_id(),
-    resource_group(),
-    resource_type(),
-    resource_config(),
-    create_opts()
-) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
-ensure_resource(InstId, Group, ResourceType, Config, Opts) ->
-    case lookup(InstId) of
-        {ok, _Group, Data} ->
-            {ok, Data};
-        {error, not_found} ->
-            case do_start(InstId, Group, ResourceType, Config, Opts) of
-                ok ->
-                    {ok, _Group, Data} = lookup(InstId),
-                    {ok, Data};
-                Error ->
-                    Error
-            end
-    end.
-
-%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
-%%
-%% Triggers the `emqx_resource_manager_sup` supervisor to actually create
-%% and link the process itself if not already started, and then immedately stops.
--spec create_dry_run(resource_type(), resource_config()) ->
-    ok | {error, Reason :: term()}.
-create_dry_run(ResourceType, Config) ->
-    InstId = make_test_id(),
-    case do_start(InstId, <<"dry_run">>, ResourceType, Config, #{}) of
-        ok ->
-            stop(InstId);
-        Error ->
-            stop(InstId),
-            Error
-    end.
-
-%% @doc Called from emqx_resource when recreating a resource which may or may not exist
--spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
-    {ok, resource_data()} | {error, Reason :: term()}.
-recreate(InstId, ResourceType, NewConfig, Opts) ->
-    case lookup(InstId) of
-        {ok, Group, #{mod := ResourceType, status := connected} = _Data} ->
-            %% If this resource is in use (status='connected'), we should make sure
-            %% the new config is OK before removing the old one.
-            case create_dry_run(ResourceType, NewConfig) of
-                ok ->
-                    remove(InstId, false),
-                    ensure_resource(InstId, Group, ResourceType, NewConfig, Opts);
-                Error ->
-                    Error
-            end;
-        {ok, Group, #{mod := ResourceType, status := _} = _Data} ->
-            remove(InstId, false),
-            ensure_resource(InstId, Group, ResourceType, NewConfig, Opts);
-        {ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
-            {error, updating_to_incorrect_resource_type};
-        {error, not_found} ->
-            {error, not_found}
-    end.
-
-%% @doc Stops a running resource_manager and clears the metrics for the resource
--spec remove(instance_id()) -> ok | {error, Reason :: term()}.
-remove(InstId) when is_binary(InstId) ->
-    remove(InstId, true).
-
-%% @doc Stops a running resource_manager and optionally clears the metrics for the resource
--spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}.
-remove(InstId, ClearMetrics) when is_binary(InstId) ->
-    safe_call(InstId, {remove, ClearMetrics}).
-
-%% @doc Stops and then starts an instance that was already running
--spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
-restart(InstId, Opts) when is_binary(InstId) ->
-    case lookup(InstId) of
-        {ok, Group, #{mod := ResourceType, config := Config} = _Data} ->
-            remove(InstId),
-            do_start(InstId, Group, ResourceType, Config, Opts);
-        Error ->
-            Error
-    end.
-
-%% @doc Stop the resource manager process
--spec stop(instance_id()) -> ok | {error, Reason :: term()}.
-stop(InstId) ->
-    safe_call(InstId, stop).
-
-%% @doc Test helper
--spec set_resource_status_connecting(instance_id()) -> ok.
-set_resource_status_connecting(InstId) ->
-    safe_call(InstId, set_resource_status_connecting).
-
-%% @doc Lookup the group and data of a resource
--spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
-lookup(InstId) ->
-    safe_call(InstId, lookup).
-
-%% @doc Lookup the group and data of a resource
--spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
-ets_lookup(InstId) ->
-    case ets:lookup(?ETS_TABLE, InstId) of
-        [{_id, Group, Data}] ->
-            {ok, Group, data_record_to_external_map(Data)};
-        [] ->
-            {error, not_found}
-    end.
-
-%% @doc Reset the metrics for the specified resource
--spec reset_metrics(instance_id()) -> ok.
-reset_metrics(InstId) ->
-    emqx_plugin_libs_metrics:reset_metrics(resource_metrics, InstId).
-
-%% @doc Returns the data for all resorces
--spec list_all() -> [resource_data()] | [].
-list_all() ->
-    try
-        [
-            data_record_to_external_map(Data)
-         || {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE)
-        ]
-    catch
-        error:badarg -> []
-    end.
-
-%% @doc Returns a list of ids for all the resources in a group
--spec list_group(resource_group()) -> [instance_id()].
-list_group(Group) ->
-    List = ets:match(?ETS_TABLE, {'$1', Group, '_'}),
-    lists:flatten(List).
-
--spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
-health_check(InstId) ->
-    safe_call(InstId, health_check).
-
-%% Server start/stop callbacks
-
-%% @doc Function called from the supervisor to actually start the server
-start_link(InstId, Group, ResourceType, Config, Opts) ->
-    Data = #data{
-        id = InstId,
-        group = Group,
-        mod = ResourceType,
-        config = Config,
-        opts = Opts,
-        status = undefined,
-        state = undefined,
-        error = undefined
-    },
-    gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
-
-init(Data) ->
-    {ok, connecting, Data}.
-
-terminate(_Reason, _State, Data) ->
-    ets:delete(?ETS_TABLE, Data#data.id),
-    ok.
-
-%% Behavior callback
-
-callback_mode() -> [handle_event_function, state_enter].
-
-%% Common event Function
-
-%% Called when the resource needs to be stopped
-handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
-    {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]};
-handle_event({call, From}, stop, _State, #data{status = disconnected} = _Data) ->
-    {keep_state_and_data, [{reply, From, ok}]};
-handle_event({call, From}, stop, _State, Data) ->
-    Result = do_stop(Data),
-    {next_state, disconnected, Data, [{reply, From, Result}]};
-handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
-    handle_remove_event(From, ClearMetrics, Data);
-handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
-    Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)},
-    {keep_state_and_data, [{reply, From, Reply}]};
-handle_event({call, From}, health_check, disconnected, Data) ->
-    Actions = [{reply, From, {error, Data#data.error}}],
-    {keep_state_and_data, Actions};
-handle_event({call, From}, health_check, _State, Data) ->
-    handle_health_check_event(From, Data);
-handle_event(enter, connecting, connecting, Data) ->
-    handle_connecting_state_enter_event(Data);
-handle_event(enter, _OldState, connecting, Data) ->
-    Actions = [{state_timeout, 0, healthcheck}],
-    {next_state, connecting, Data, Actions};
-handle_event(state_timeout, healthcheck, connecting, #data{status = disconnected} = Data) ->
-    {next_state, disconnected, Data};
-handle_event(state_timeout, healthcheck, connecting, Data) ->
-    connecting_healthcheck(Data);
-%% The connected state is entered after a successful start of the callback mod
-%% and successful healthchecks
-handle_event(enter, _OldState, connected, Data) ->
-    Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, healthcheck}],
-    {next_state, connected, Data, Actions};
-handle_event(state_timeout, healthcheck, connected, Data) ->
-    perform_connected_healthcheck(Data);
-handle_event(enter, _OldState, disconnected, #data{id = InstId} = Data) ->
-    UpdatedData = Data#data{status = disconnected},
-    ets:delete(?ETS_TABLE, InstId),
-    {next_state, disconnected, UpdatedData}.
-
-%%------------------------------------------------------------------------------
-%% internal functions
-%%------------------------------------------------------------------------------
-
-handle_connecting_state_enter_event(Data) ->
-    case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
-        {ok, ResourceState} ->
-            UpdatedData = Data#data{state = ResourceState, status = connecting},
-            %% Perform an initial healthcheck immediately before transitioning into a connected state
-            Actions = [{state_timeout, 0, healthcheck}],
-            {next_state, connecting, UpdatedData, Actions};
-        {error, Reason} ->
-            %% Keep track of the error reason why the connection did not work
-            %% so that the Reason can be returned when the verification call is made.
-            UpdatedData = Data#data{status = disconnected, error = Reason},
-            Actions = [{state_timeout, 0, healthcheck}],
-            {next_state, connecting, UpdatedData, Actions}
-    end.
-
-handle_health_check_event(From, Data) ->
-    case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
-        {ok, ResourceState} ->
-            UpdatedData = Data#data{state = ResourceState, status = connected, error = undefined},
-            ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}),
-            Actions = [{reply, From, ok}],
-            {next_state, connected, UpdatedData, Actions};
-        {error, Reason} ->
-            logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]),
-            UpdatedData = Data#data{error = Reason},
-            ets:delete(?ETS_TABLE, Data#data.id),
-            Actions = [{reply, From, {error, Reason}}],
-            {next_state, connecting, UpdatedData, Actions};
-        {error, Reason, ResourceState} ->
-            logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]),
-            UpdatedData = Data#data{state = ResourceState, error = Reason},
-            ets:delete(?ETS_TABLE, Data#data.id),
-            Actions = [{reply, From, {error, Reason}}],
-            {next_state, connecting, UpdatedData, Actions}
-    end.
-
-handle_remove_event(From, ClearMetrics, Data) ->
-    do_stop(Data),
-    ets:delete(?ETS_TABLE, Data#data.id),
-    case ClearMetrics of
-        true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, Data#data.id);
-        false -> ok
-    end,
-    {stop_and_reply, normal, [{reply, From, ok}]}.
-
-do_start(InstId, Group, ResourceType, Config, Opts) ->
-    % The state machine will make the actual call to the callback/resource module after init
-    emqx_resource_manager_sup:start_child(InstId, Group, ResourceType, Config, Opts),
-    case wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)) of
-        ok ->
-            ok = emqx_plugin_libs_metrics:create_metrics(
-                resource_metrics,
-                InstId,
-                [matched, success, failed, exception],
-                [matched]
-            ),
-            ok;
-        timeout ->
-            {error, timeout}
-    end.
-
-do_stop(Data) ->
-    Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
-    ets:delete(?ETS_TABLE, Data#data.id),
-    Result.
-
-proc_name(Id) ->
-    binary_to_atom(Id).
-
-connecting_healthcheck(Data) ->
-    case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
-        {ok, ResourceState} ->
-            UpdatedData = Data#data{state = ResourceState, status = connected, error = undefined},
-            ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}),
-            {next_state, connected, UpdatedData};
-        {error, Reason} ->
-            logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]),
-            UpdatedData = Data#data{error = Reason},
-            Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, healthcheck}],
-            {keep_state, UpdatedData, Actions};
-        {error, Reason, ResourceState} ->
-            logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]),
-            UpdatedData = Data#data{state = ResourceState, error = Reason},
-            Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, healthcheck}],
-            {keep_state, UpdatedData, Actions}
-    end.
-
-perform_connected_healthcheck(Data) ->
-    case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
-        {ok, ResourceState} ->
-            UpdatedData = Data#data{state = ResourceState},
-            ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}),
-            Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, healthcheck}],
-            {keep_state, UpdatedData, Actions};
-        {error, Reason} ->
-            logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]),
-            UpdatedData = Data#data{error = Reason},
-            ets:delete(?ETS_TABLE, Data#data.id),
-            {next_state, connecting, UpdatedData};
-        {error, Reason, ResourceState} ->
-            logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]),
-            UpdatedData = Data#data{state = ResourceState, error = Reason},
-            ets:delete(?ETS_TABLE, Data#data.id),
-            {next_state, connecting, UpdatedData}
-    end.
-
-data_record_to_external_map(Data) ->
-    #{
-        id => Data#data.id,
-        mod => Data#data.mod,
-        config => Data#data.config,
-        status => Data#data.status,
-        state => Data#data.state
-    }.
-
-data_record_to_external_map_with_metrics(Data) ->
-    DataMap = data_record_to_external_map(Data),
-    DataMap#{metrics => get_metrics(Data#data.id)}.
-
-make_test_id() ->
-    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
-    <<?TEST_ID_PREFIX, RandId/binary>>.
-
-wait_for_resource_ready(InstId, WaitTime) ->
-    do_wait_for_resource_ready(InstId, WaitTime div ?TIME_DIVISOR).
-
-do_wait_for_resource_ready(_InstId, 0) ->
-    timeout;
-do_wait_for_resource_ready(InstId, Retry) ->
-    case lookup(InstId) of
-        {ok, _Group, #{status := connected}} ->
-            ok;
-        _ ->
-            timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
-            do_wait_for_resource_ready(InstId, Retry - 1)
-    end.
-
-%% @doc Get the metrics for the specified resource
-get_metrics(InstId) ->
-    emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
-
-safe_call(InstId, Message) ->
-    try
-        gen_statem:call(proc_name(InstId), Message)
-    catch
-        exit:{noproc, _} ->
-            {error, not_found}
-    end.

+ 0 - 44
apps/emqx_resource/src/emqx_resource_manager_sup.erl

@@ -1,44 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_resource_manager_sup).
-
--behaviour(supervisor).
-
--export([start_child/5]).
-
--export([start_link/0]).
-
--export([init/1]).
-
-start_child(InstId, Group, ResourceType, Config, Opts) ->
-    supervisor:start_child(?MODULE, [InstId, Group, ResourceType, Config, Opts]).
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init([]) ->
-    TabOpts = [named_table, set, public, {read_concurrency, true}],
-    _ = ets:new(emqx_resource_manager, TabOpts),
-
-    ChildSpecs = [#{id => emqx_resource_manager,
-                    start => {emqx_resource_manager, start_link, []},
-                    restart => transient,
-                    shutdown => brutal_kill,
-                    type => worker,
-                    modules => [emqx_resource_manager]}],
-
-    SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10},
-    {ok, {SupFlags, ChildSpecs}}.

+ 30 - 6
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -34,9 +34,33 @@ init([]) ->
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
     Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics),
     Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics),
 
 
-    ResourceManager = 
-            #{id => emqx_resource_manager_sup,
-              start => {emqx_resource_manager_sup, start_link, []},
-              restart => permanent,
-              shutdown => infinity, type => supervisor, modules => [emqx_resource_manager_sup]},
-    {ok, {SupFlags, [Metrics, ResourceManager]}}.
+    Pool = ?RESOURCE_INST_MOD,
+    Mod = ?RESOURCE_INST_MOD,
+    ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]),
+    ResourceInsts = [
+        begin
+            ensure_pool_worker(Pool, {Pool, Idx}, Idx),
+            #{id => {Mod, Idx},
+              start => {Mod, start_link, [Pool, Idx]},
+              restart => transient,
+              shutdown => 5000, type => worker, modules => [Mod]}
+        end || Idx <- lists:seq(1, ?POOL_SIZE)],
+    HealthCheck = 
+            #{id => emqx_resource_health_check_sup,
+              start => {emqx_resource_health_check_sup, start_link, []},
+              restart => transient,
+              shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]},
+    {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}.
+
+%% internal functions
+ensure_pool(Pool, Type, Opts) ->
+    try gproc_pool:new(Pool, Type, Opts)
+    catch
+        error:exists -> ok
+    end.
+
+ensure_pool_worker(Pool, Name, Slot) ->
+    try gproc_pool:add_worker(Pool, Name, Slot)
+    catch
+        error:exists -> ok
+    end.

+ 11 - 10
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -71,7 +71,7 @@ t_create_remove(_) ->
                 ?TEST_RESOURCE,
                 ?TEST_RESOURCE,
                 #{name => test_resource}),
                 #{name => test_resource}),
 
 
-    {ok, _} = emqx_resource:recreate(
+    emqx_resource:recreate(
                 ?ID,
                 ?ID,
                 ?TEST_RESOURCE,
                 ?TEST_RESOURCE,
                 #{name => test_resource},
                 #{name => test_resource},
@@ -166,6 +166,7 @@ t_healthy(_) ->
                 #{name => <<"test_resource">>}),
                 #{name => <<"test_resource">>}),
     timer:sleep(400),
     timer:sleep(400),
 
 
+    emqx_resource_health_check:create_checker(?ID, 15000, 10000),
     #{pid := Pid} = emqx_resource:query(?ID, get_state),
     #{pid := Pid} = emqx_resource:query(?ID, get_state),
     timer:sleep(300),
     timer:sleep(300),
     emqx_resource:set_resource_status_connecting(?ID),
     emqx_resource:set_resource_status_connecting(?ID),
@@ -183,7 +184,7 @@ t_healthy(_) ->
         emqx_resource:health_check(?ID)),
         emqx_resource:health_check(?ID)),
 
 
     ?assertMatch(
     ?assertMatch(
-        [],
+        [#{status := connecting}],
         emqx_resource:list_instances_verbose()),
         emqx_resource:list_instances_verbose()),
 
 
     ok = emqx_resource:remove_local(?ID).
     ok = emqx_resource:remove_local(?ID).
@@ -215,7 +216,7 @@ t_stop_start(_) ->
 
 
     ?assertNot(is_process_alive(Pid0)),
     ?assertNot(is_process_alive(Pid0)),
 
 
-    ?assertMatch({error, {emqx_resource, #{reason := not_found}}},
+    ?assertMatch({error, {emqx_resource, #{reason := disconnected}}},
         emqx_resource:query(?ID, get_state)),
         emqx_resource:query(?ID, get_state)),
 
 
     ok = emqx_resource:restart(?ID),
     ok = emqx_resource:restart(?ID),
@@ -253,7 +254,7 @@ t_stop_start_local(_) ->
 
 
     ?assertNot(is_process_alive(Pid0)),
     ?assertNot(is_process_alive(Pid0)),
 
 
-    ?assertMatch({error, {emqx_resource, #{reason := not_found}}},
+    ?assertMatch({error, {emqx_resource, #{reason := disconnected}}},
         emqx_resource:query(?ID, get_state)),
         emqx_resource:query(?ID, get_state)),
 
 
     ok = emqx_resource:restart(?ID),
     ok = emqx_resource:restart(?ID),
@@ -294,17 +295,17 @@ t_create_dry_run_local(_) ->
     ?assertEqual(undefined, whereis(test_resource)).
     ?assertEqual(undefined, whereis(test_resource)).
 
 
 t_create_dry_run_local_failed(_) ->
 t_create_dry_run_local_failed(_) ->
-    {Res1, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
+    {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
                        #{cteate_error => true}),
                        #{cteate_error => true}),
-    ?assertEqual(error, Res1),
+    ?assertEqual(error, Res),
 
 
-    {Res2, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
+    {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
                        #{name => test_resource, health_check_error => true}),
                        #{name => test_resource, health_check_error => true}),
-    ?assertEqual(error, Res2),
+    ?assertEqual(error, Res),
 
 
-    {Res3, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
+    {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE,
                        #{name => test_resource, stop_error => true}),
                        #{name => test_resource, stop_error => true}),
-    ?assertEqual(error, Res3).
+    ?assertEqual(error, Res).
 
 
 t_test_func(_) ->
 t_test_func(_) ->
     ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),
     ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),