|
|
@@ -25,12 +25,12 @@
|
|
|
|
|
|
% API
|
|
|
-export([
|
|
|
- ensure_resource/5,
|
|
|
- recreate/4,
|
|
|
+ ensure_resource/6,
|
|
|
+ recreate/5,
|
|
|
remove/1,
|
|
|
- create_dry_run/2,
|
|
|
create_dry_run/3,
|
|
|
create_dry_run/4,
|
|
|
+ create_dry_run/5,
|
|
|
restart/2,
|
|
|
start/2,
|
|
|
stop/1,
|
|
|
@@ -59,7 +59,7 @@
|
|
|
]).
|
|
|
|
|
|
% Server
|
|
|
--export([start_link/5]).
|
|
|
+-export([start_link/6]).
|
|
|
|
|
|
% Behaviour
|
|
|
-export([init/1, callback_mode/0, handle_event/4, terminate/3]).
|
|
|
@@ -75,6 +75,7 @@
|
|
|
-record(data, {
|
|
|
id,
|
|
|
group,
|
|
|
+ type,
|
|
|
mod,
|
|
|
callback_mode,
|
|
|
query_mode,
|
|
|
@@ -161,43 +162,44 @@
|
|
|
%% Triggers the emqx_resource_manager_sup supervisor to actually create
|
|
|
%% and link the process itself if not already started.
|
|
|
-spec ensure_resource(
|
|
|
+ type(),
|
|
|
resource_id(),
|
|
|
resource_group(),
|
|
|
resource_type(),
|
|
|
resource_config(),
|
|
|
creation_opts()
|
|
|
) -> {ok, resource_data()}.
|
|
|
-ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
|
|
|
+ensure_resource(Type, ResId, Group, ResourceType, Config, Opts) ->
|
|
|
case lookup(ResId) of
|
|
|
{ok, _Group, Data} ->
|
|
|
{ok, Data};
|
|
|
{error, not_found} ->
|
|
|
- create_and_return_data(ResId, Group, ResourceType, Config, Opts)
|
|
|
+ create_and_return_data(Type, ResId, Group, ResourceType, Config, Opts)
|
|
|
end.
|
|
|
|
|
|
%% @doc Called from emqx_resource when recreating a resource which may or may not exist
|
|
|
--spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) ->
|
|
|
+-spec recreate(type(), resource_id(), resource_type(), resource_config(), creation_opts()) ->
|
|
|
{ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
|
|
|
-recreate(ResId, ResourceType, NewConfig, Opts) ->
|
|
|
+recreate(Type, ResId, ResourceType, NewConfig, Opts) ->
|
|
|
case lookup(ResId) of
|
|
|
{ok, Group, #{mod := ResourceType, status := _} = _Data} ->
|
|
|
_ = remove(ResId, false),
|
|
|
- create_and_return_data(ResId, Group, ResourceType, NewConfig, Opts);
|
|
|
+ create_and_return_data(Type, ResId, Group, ResourceType, NewConfig, Opts);
|
|
|
{ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
|
|
|
{error, updating_to_incorrect_resource_type};
|
|
|
{error, not_found} ->
|
|
|
{error, not_found}
|
|
|
end.
|
|
|
|
|
|
-create_and_return_data(ResId, Group, ResourceType, Config, Opts) ->
|
|
|
- _ = create(ResId, Group, ResourceType, Config, Opts),
|
|
|
+create_and_return_data(Type, ResId, Group, ResourceType, Config, Opts) ->
|
|
|
+ _ = create(Type, ResId, Group, ResourceType, Config, Opts),
|
|
|
{ok, _Group, Data} = lookup(ResId),
|
|
|
{ok, Data}.
|
|
|
|
|
|
%% @doc Create a resource_manager and wait until it is running
|
|
|
-create(ResId, Group, ResourceType, Config, Opts) ->
|
|
|
+create(Type, ResId, Group, ResourceType, Config, Opts) ->
|
|
|
% The state machine will make the actual call to the callback/resource module after init
|
|
|
- ok = emqx_resource_manager_sup:ensure_child(ResId, Group, ResourceType, Config, Opts),
|
|
|
+ ok = emqx_resource_manager_sup:ensure_child(Type, ResId, Group, ResourceType, Config, Opts),
|
|
|
% Create metrics for the resource
|
|
|
ok = emqx_resource:create_metrics(ResId),
|
|
|
QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
|
|
|
@@ -219,30 +221,32 @@ create(ResId, Group, ResourceType, Config, Opts) ->
|
|
|
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
|
|
|
%%
|
|
|
%% Triggers the `emqx_resource_manager_sup` supervisor to actually create
|
|
|
-%% and link the process itself if not already started, and then immedately stops.
|
|
|
--spec create_dry_run(resource_type(), resource_config()) ->
|
|
|
+%% and link the process itself if not already started, and then immediately stops.
|
|
|
+-spec create_dry_run(type(), resource_type(), resource_config()) ->
|
|
|
ok | {error, Reason :: term()}.
|
|
|
-create_dry_run(ResourceType, Config) ->
|
|
|
+create_dry_run(Type, ResourceType, Config) ->
|
|
|
ResId = make_test_id(),
|
|
|
- create_dry_run(ResId, ResourceType, Config).
|
|
|
+ create_dry_run(Type, ResId, ResourceType, Config).
|
|
|
|
|
|
-create_dry_run(ResId, ResourceType, Config) ->
|
|
|
- create_dry_run(ResId, ResourceType, Config, fun do_nothing_on_ready/1).
|
|
|
+create_dry_run(Type, ResId, ResourceType, Config) ->
|
|
|
+ create_dry_run(Type, ResId, ResourceType, Config, fun do_nothing_on_ready/1).
|
|
|
|
|
|
do_nothing_on_ready(_ResId) ->
|
|
|
ok.
|
|
|
|
|
|
--spec create_dry_run(resource_id(), resource_type(), resource_config(), OnReadyCallback) ->
|
|
|
+-spec create_dry_run(type(), resource_id(), resource_type(), resource_config(), OnReadyCallback) ->
|
|
|
ok | {error, Reason :: term()}
|
|
|
when
|
|
|
OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}).
|
|
|
-create_dry_run(ResId, ResourceType, Config, OnReadyCallback) ->
|
|
|
+create_dry_run(Type, ResId, ResourceType, Config, OnReadyCallback) ->
|
|
|
Opts =
|
|
|
case is_map(Config) of
|
|
|
true -> maps:get(resource_opts, Config, #{});
|
|
|
false -> #{}
|
|
|
end,
|
|
|
- ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts),
|
|
|
+ ok = emqx_resource_manager_sup:ensure_child(
|
|
|
+ Type, ResId, <<"dry_run">>, ResourceType, Config, Opts
|
|
|
+ ),
|
|
|
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
|
|
|
Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000),
|
|
|
case wait_for_ready(ResId, Timeout) of
|
|
|
@@ -491,7 +495,7 @@ try_clean_allocated_resources(ResId) ->
|
|
|
%% Server start/stop callbacks
|
|
|
|
|
|
%% @doc Function called from the supervisor to actually start the server
|
|
|
-start_link(ResId, Group, ResourceType, Config, Opts) ->
|
|
|
+start_link(Type, ResId, Group, ResourceType, Config, Opts) ->
|
|
|
QueryMode = emqx_resource:query_mode(
|
|
|
ResourceType,
|
|
|
Config,
|
|
|
@@ -499,6 +503,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) ->
|
|
|
),
|
|
|
Data = #data{
|
|
|
id = ResId,
|
|
|
+ type = Type,
|
|
|
group = Group,
|
|
|
mod = ResourceType,
|
|
|
callback_mode = emqx_resource:get_callback_mode(ResourceType),
|
|
|
@@ -683,6 +688,9 @@ handle_event(EventType, EventData, State, Data) ->
|
|
|
error,
|
|
|
#{
|
|
|
msg => "ignore_all_other_events",
|
|
|
+ resource_id => Data#data.id,
|
|
|
+ group => Data#data.group,
|
|
|
+ type => Data#data.type,
|
|
|
event_type => EventType,
|
|
|
event_data => EventData,
|
|
|
state => State,
|
|
|
@@ -752,8 +760,8 @@ handle_remove_event(From, ClearMetrics, Data) ->
|
|
|
|
|
|
start_resource(Data, From) ->
|
|
|
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
|
|
|
- ResId = Data#data.id,
|
|
|
- case emqx_resource:call_start(ResId, Data#data.mod, Data#data.config) of
|
|
|
+ #data{id = ResId, mod = Mod, config = Config, group = Group, type = Type} = Data,
|
|
|
+ case emqx_resource:call_start(ResId, Mod, Config) of
|
|
|
{ok, ResourceState} ->
|
|
|
UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
|
|
|
%% Perform an initial health_check immediately before transitioning into a connected state
|
|
|
@@ -764,7 +772,9 @@ start_resource(Data, From) ->
|
|
|
IsDryRun = emqx_resource:is_dry_run(ResId),
|
|
|
?SLOG(log_level(IsDryRun), #{
|
|
|
msg => "start_resource_failed",
|
|
|
- id => ResId,
|
|
|
+ resource_id => ResId,
|
|
|
+ group => Group,
|
|
|
+ type => Type,
|
|
|
reason => Reason
|
|
|
}),
|
|
|
_ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error),
|
|
|
@@ -798,14 +808,20 @@ add_channels(Data) ->
|
|
|
add_channels_in_list([], Data) ->
|
|
|
Data;
|
|
|
add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
|
|
|
- Id = Data#data.id,
|
|
|
+ #data{
|
|
|
+ id = ResId,
|
|
|
+ mod = Mod,
|
|
|
+ state = State,
|
|
|
+ added_channels = AddedChannelsMap,
|
|
|
+ group = Group,
|
|
|
+ type = Type
|
|
|
+ } = Data,
|
|
|
case
|
|
|
emqx_resource:call_add_channel(
|
|
|
- Id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig
|
|
|
+ ResId, Mod, State, ChannelID, ChannelConfig
|
|
|
)
|
|
|
of
|
|
|
{ok, NewState} ->
|
|
|
- AddedChannelsMap = Data#data.added_channels,
|
|
|
%% Set the channel status to connecting to indicate that
|
|
|
%% we have not yet performed the initial health_check
|
|
|
NewAddedChannelsMap = maps:put(
|
|
|
@@ -819,10 +835,12 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
|
|
|
},
|
|
|
add_channels_in_list(Rest, NewData);
|
|
|
{error, Reason} = Error ->
|
|
|
- IsDryRun = emqx_resource:is_dry_run(Id),
|
|
|
+ IsDryRun = emqx_resource:is_dry_run(ResId),
|
|
|
?SLOG(log_level(IsDryRun), #{
|
|
|
msg => "add_channel_failed",
|
|
|
- id => Id,
|
|
|
+ resource_id => ResId,
|
|
|
+ type => Type,
|
|
|
+ group => Group,
|
|
|
channel_id => ChannelID,
|
|
|
reason => Reason
|
|
|
}),
|
|
|
@@ -872,9 +890,15 @@ remove_channels(Data) ->
|
|
|
remove_channels_in_list([], Data, _KeepInChannelMap) ->
|
|
|
Data;
|
|
|
remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
|
|
|
- AddedChannelsMap = Data#data.added_channels,
|
|
|
- Id = Data#data.id,
|
|
|
- IsDryRun = emqx_resource:is_dry_run(Id),
|
|
|
+ #data{
|
|
|
+ id = ResId,
|
|
|
+ added_channels = AddedChannelsMap,
|
|
|
+ mod = Mod,
|
|
|
+ state = State,
|
|
|
+ group = Group,
|
|
|
+ type = Type
|
|
|
+ } = Data,
|
|
|
+ IsDryRun = emqx_resource:is_dry_run(ResId),
|
|
|
NewAddedChannelsMap =
|
|
|
case KeepInChannelMap of
|
|
|
true ->
|
|
|
@@ -883,7 +907,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
|
|
|
_ = maybe_clear_alarm(IsDryRun, ChannelID),
|
|
|
maps:remove(ChannelID, AddedChannelsMap)
|
|
|
end,
|
|
|
- case safe_call_remove_channel(Id, Data#data.mod, Data#data.state, ChannelID) of
|
|
|
+ case safe_call_remove_channel(ResId, Mod, State, ChannelID) of
|
|
|
{ok, NewState} ->
|
|
|
NewData = Data#data{
|
|
|
state = NewState,
|
|
|
@@ -893,7 +917,9 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
|
|
|
{error, Reason} ->
|
|
|
?SLOG(log_level(IsDryRun), #{
|
|
|
msg => "remove_channel_failed",
|
|
|
- id => Id,
|
|
|
+ resource_id => ResId,
|
|
|
+ group => Group,
|
|
|
+ type => Type,
|
|
|
channel_id => ChannelID,
|
|
|
reason => Reason
|
|
|
}),
|
|
|
@@ -997,7 +1023,12 @@ handle_remove_channel(From, ChannelId, Data) ->
|
|
|
end.
|
|
|
|
|
|
handle_remove_channel_exists(From, ChannelId, Data) ->
|
|
|
- #data{id = Id, added_channels = AddedChannelsMap} = Data,
|
|
|
+ #data{
|
|
|
+ id = Id,
|
|
|
+ group = Group,
|
|
|
+ type = Type,
|
|
|
+ added_channels = AddedChannelsMap
|
|
|
+ } = Data,
|
|
|
case
|
|
|
emqx_resource:call_remove_channel(
|
|
|
Id, Data#data.mod, Data#data.state, ChannelId
|
|
|
@@ -1014,7 +1045,9 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
|
|
|
IsDryRun = emqx_resource:is_dry_run(Id),
|
|
|
?SLOG(log_level(IsDryRun), #{
|
|
|
msg => "remove_channel_failed",
|
|
|
- id => Id,
|
|
|
+ resource_id => Id,
|
|
|
+ group => Group,
|
|
|
+ type => Type,
|
|
|
channel_id => ChannelId,
|
|
|
reason => Reason
|
|
|
}),
|
|
|
@@ -1123,10 +1156,13 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
|
|
|
Actions = Replies ++ resource_health_check_actions(Data),
|
|
|
{keep_state, Data, Actions};
|
|
|
_ ->
|
|
|
- IsDryRun = emqx_resource:is_dry_run(Data0#data.id),
|
|
|
+ #data{id = ResId, group = Group, type = Type} = Data0,
|
|
|
+ IsDryRun = emqx_resource:is_dry_run(ResId),
|
|
|
?SLOG(log_level(IsDryRun), #{
|
|
|
msg => "health_check_failed",
|
|
|
- id => Data0#data.id,
|
|
|
+ resource_id => ResId,
|
|
|
+ group => Group,
|
|
|
+ type => Type,
|
|
|
status => NewStatus
|
|
|
}),
|
|
|
%% Note: works because, coincidentally, channel/resource status is a
|
|
|
@@ -1633,6 +1669,8 @@ parse_health_check_result({error, Error}, Data) ->
|
|
|
#{
|
|
|
msg => "health_check_exception",
|
|
|
resource_id => Data#data.id,
|
|
|
+ type => Data#data.type,
|
|
|
+ group => Data#data.group,
|
|
|
reason => Error
|
|
|
}
|
|
|
),
|