|
|
@@ -34,6 +34,7 @@
|
|
|
manager_pid :: pid(),
|
|
|
st_err :: st_err(),
|
|
|
config :: term(),
|
|
|
+ stable :: term(),
|
|
|
extra = []
|
|
|
}).
|
|
|
|
|
|
@@ -69,6 +70,13 @@ write(ManagerPid, Group, Data) ->
|
|
|
status := Status,
|
|
|
added_channels := AddedChannels
|
|
|
} = Data,
|
|
|
+ Stable = #{
|
|
|
+ mod => Mod,
|
|
|
+ callback_mode => CallbackMode,
|
|
|
+ query_mode => QueryMode,
|
|
|
+ state => State
|
|
|
+ },
|
|
|
+ IsDryrun = emqx_resource:is_dry_run(ID),
|
|
|
Connector = #connector{
|
|
|
id = ID,
|
|
|
group = Group,
|
|
|
@@ -78,19 +86,27 @@ write(ManagerPid, Group, Data) ->
|
|
|
error => external_error(Error)
|
|
|
},
|
|
|
config = Config,
|
|
|
+ stable =
|
|
|
+ case IsDryrun of
|
|
|
+ true ->
|
|
|
+ %% save stable part in ets for dryrun
|
|
|
+ Stable;
|
|
|
+ false ->
|
|
|
+ undefined
|
|
|
+ end,
|
|
|
extra = []
|
|
|
},
|
|
|
Channels = lists:map(fun to_channel_record/1, maps:to_list(AddedChannels)),
|
|
|
- Stable = #{
|
|
|
- mod => Mod,
|
|
|
- callback_mode => CallbackMode,
|
|
|
- query_mode => QueryMode,
|
|
|
- state => State
|
|
|
- },
|
|
|
%% erase old channels (if any)
|
|
|
ok = erase_old_channels(ID, maps:keys(AddedChannels)),
|
|
|
%% put stable state in persistent_term
|
|
|
- ok = put_state_pt(ID, Stable),
|
|
|
+ case IsDryrun of
|
|
|
+ true ->
|
|
|
+ %% do not write persistent_term for dryrun
|
|
|
+ ok;
|
|
|
+ false ->
|
|
|
+ ok = put_state_pt(ID, Stable)
|
|
|
+ end,
|
|
|
%% insert connector and channel states
|
|
|
true = ets:insert(?RESOURCE_STATE_CACHE, [Connector | Channels]),
|
|
|
ok.
|
|
|
@@ -118,13 +134,22 @@ read_manager_pid(ID) ->
|
|
|
|
|
|
-spec read_mod(resource_id()) -> not_found | {ok, module()}.
|
|
|
read_mod(ID) ->
|
|
|
- case get_state_pt(ID) of
|
|
|
+ case get_state(ID) of
|
|
|
undefined ->
|
|
|
not_found;
|
|
|
#{mod := Mod} ->
|
|
|
{ok, Mod}
|
|
|
end.
|
|
|
|
|
|
+get_state(ID) ->
|
|
|
+ case get_state_pt(ID) of
|
|
|
+ undefined ->
|
|
|
+ %% maybe it's a dryrun connector
|
|
|
+ ets:lookup_element(?RESOURCE_STATE_CACHE, ConnectorId, #connector.stable, undefined);
|
|
|
+ InPt ->
|
|
|
+ InPt
|
|
|
+ end.
|
|
|
+
|
|
|
-spec erase(resource_id()) -> ok.
|
|
|
erase(ID) ->
|
|
|
MS = ets:fun2ms(fun(#channel{id = {C, _}}) when C =:= ID -> true end),
|
|
|
@@ -172,7 +197,7 @@ all_ids() ->
|
|
|
get_runtime(ID) ->
|
|
|
ChanKey = {ConnectorId, _ChanID} = split_channel_id(ID),
|
|
|
try
|
|
|
- Stable = get_state_pt(ConnectorId),
|
|
|
+ Stable = get_state(ConnectorId),
|
|
|
ChannelStatus = get_channel_status(ChanKey),
|
|
|
StErr = ets:lookup_element(?RESOURCE_STATE_CACHE, ConnectorId, #connector.st_err),
|
|
|
{ok, #rt{
|
|
|
@@ -253,15 +278,22 @@ make_resource_data(ID, Connector, Channels) ->
|
|
|
error := Error,
|
|
|
status := Status
|
|
|
},
|
|
|
- config = Config
|
|
|
+ config = Config,
|
|
|
+ stable = Stable0
|
|
|
} = Connector,
|
|
|
+ Stable =
|
|
|
+ case Stable0 of
|
|
|
+ undefined ->
|
|
|
+ get_state_pt(ID);
|
|
|
+ X ->
|
|
|
+ X
|
|
|
+ end,
|
|
|
#{
|
|
|
mod := Mod,
|
|
|
callback_mode := CallbackMode,
|
|
|
query_mode := QueryMode,
|
|
|
state := State
|
|
|
- } = get_state_pt(ID),
|
|
|
-
|
|
|
+ } = Stable,
|
|
|
#{
|
|
|
id => ID,
|
|
|
mod => Mod,
|