|
@@ -416,7 +416,7 @@ uninstall_bridge_v2(
|
|
|
{error, _} ->
|
|
{error, _} ->
|
|
|
ok;
|
|
ok;
|
|
|
ok ->
|
|
ok ->
|
|
|
- %% Deinstall from connector
|
|
|
|
|
|
|
+ %% uninstall from connector
|
|
|
ConnectorId = emqx_connector_resource:resource_id(
|
|
ConnectorId = emqx_connector_resource:resource_id(
|
|
|
connector_type(BridgeV2Type), ConnectorName
|
|
connector_type(BridgeV2Type), ConnectorName
|
|
|
),
|
|
),
|
|
@@ -869,6 +869,8 @@ config_key_path() ->
|
|
|
config_key_path_leaf() ->
|
|
config_key_path_leaf() ->
|
|
|
[?ROOT_KEY, '?', '?'].
|
|
[?ROOT_KEY, '?', '?'].
|
|
|
|
|
|
|
|
|
|
+pre_config_update(_, {force_update, Conf}, _OldConf) ->
|
|
|
|
|
+ {ok, Conf};
|
|
|
%% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the
|
|
%% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the
|
|
|
%% underlying resources.
|
|
%% underlying resources.
|
|
|
pre_config_update(_, {_Oper, _, _}, undefined) ->
|
|
pre_config_update(_, {_Oper, _, _}, undefined) ->
|
|
@@ -882,15 +884,57 @@ pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) ->
|
|
|
operation_to_enable(disable) -> false;
|
|
operation_to_enable(disable) -> false;
|
|
|
operation_to_enable(enable) -> true.
|
|
operation_to_enable(enable) -> true.
|
|
|
|
|
|
|
|
|
|
+%% A public API that can trigger this is:
|
|
|
|
|
+%% bin/emqx ctl conf load data/configs/cluster.hocon
|
|
|
|
|
+post_config_update([?ROOT_KEY], {force_update, _Req}, NewConf, OldConf, _AppEnv) ->
|
|
|
|
|
+ do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => false});
|
|
|
%% This top level handler will be triggered when the actions path is updated
|
|
%% This top level handler will be triggered when the actions path is updated
|
|
|
%% with calls to emqx_conf:update([actions], BridgesConf, #{}).
|
|
%% with calls to emqx_conf:update([actions], BridgesConf, #{}).
|
|
|
%%
|
|
%%
|
|
|
-%% A public API that can trigger this is:
|
|
|
|
|
-%% bin/emqx ctl conf load data/configs/cluster.hocon
|
|
|
|
|
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
|
|
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
|
|
|
|
|
+ do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => true});
|
|
|
|
|
+post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) ->
|
|
|
|
|
+ Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]),
|
|
|
|
|
+ ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf),
|
|
|
|
|
+ Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([?ROOT_KEY])),
|
|
|
|
|
+ reload_message_publish_hook(Bridges),
|
|
|
|
|
+ ?tp(bridge_post_config_update_done, #{}),
|
|
|
|
|
+ ok;
|
|
|
|
|
+post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) ->
|
|
|
|
|
+ %% N.B.: all bridges must use the same field name (`connector`) to define the
|
|
|
|
|
+ %% connector name.
|
|
|
|
|
+ ConnectorName = maps:get(connector, NewConf),
|
|
|
|
|
+ case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
|
|
|
|
|
+ ok ->
|
|
|
|
|
+ ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
|
|
|
|
+ Bridges = emqx_utils_maps:deep_put(
|
|
|
|
|
+ [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
|
|
|
|
+ ),
|
|
|
|
|
+ reload_message_publish_hook(Bridges),
|
|
|
|
|
+ ?tp(bridge_post_config_update_done, #{}),
|
|
|
|
|
+ ok;
|
|
|
|
|
+ {error, Error} ->
|
|
|
|
|
+ {error, Error}
|
|
|
|
|
+ end;
|
|
|
|
|
+post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) ->
|
|
|
|
|
+ ConnectorName = maps:get(connector, NewConf),
|
|
|
|
|
+ case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
|
|
|
|
|
+ ok ->
|
|
|
|
|
+ ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf),
|
|
|
|
|
+ ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
|
|
|
|
+ Bridges = emqx_utils_maps:deep_put(
|
|
|
|
|
+ [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
|
|
|
|
+ ),
|
|
|
|
|
+ reload_message_publish_hook(Bridges),
|
|
|
|
|
+ ?tp(bridge_post_config_update_done, #{}),
|
|
|
|
|
+ ok;
|
|
|
|
|
+ {error, Error} ->
|
|
|
|
|
+ {error, Error}
|
|
|
|
|
+ end.
|
|
|
|
|
+
|
|
|
|
|
+do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors := NeedValidate}) ->
|
|
|
#{added := Added, removed := Removed, changed := Updated} =
|
|
#{added := Added, removed := Removed, changed := Updated} =
|
|
|
diff_confs(NewConf, OldConf),
|
|
diff_confs(NewConf, OldConf),
|
|
|
- %% new and updated bridges must have their connector references validated
|
|
|
|
|
UpdatedConfigs =
|
|
UpdatedConfigs =
|
|
|
lists:map(
|
|
lists:map(
|
|
|
fun({{Type, BridgeName}, {_Old, New}}) ->
|
|
fun({{Type, BridgeName}, {_Old, New}}) ->
|
|
@@ -906,7 +950,7 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
|
|
|
maps:to_list(Added)
|
|
maps:to_list(Added)
|
|
|
),
|
|
),
|
|
|
ToValidate = UpdatedConfigs ++ AddedConfigs,
|
|
ToValidate = UpdatedConfigs ++ AddedConfigs,
|
|
|
- case multi_validate_referenced_connectors(ToValidate) of
|
|
|
|
|
|
|
+ case multi_validate_referenced_connectors(NeedValidate, ToValidate) of
|
|
|
ok ->
|
|
ok ->
|
|
|
%% The config update will be failed if any task in `perform_bridge_changes` failed.
|
|
%% The config update will be failed if any task in `perform_bridge_changes` failed.
|
|
|
RemoveFun = fun uninstall_bridge_v2/3,
|
|
RemoveFun = fun uninstall_bridge_v2/3,
|
|
@@ -930,44 +974,6 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
|
|
|
Result;
|
|
Result;
|
|
|
{error, Error} ->
|
|
{error, Error} ->
|
|
|
{error, Error}
|
|
{error, Error}
|
|
|
- end;
|
|
|
|
|
-post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) ->
|
|
|
|
|
- Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]),
|
|
|
|
|
- ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf),
|
|
|
|
|
- Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([?ROOT_KEY])),
|
|
|
|
|
- reload_message_publish_hook(Bridges),
|
|
|
|
|
- ?tp(bridge_post_config_update_done, #{}),
|
|
|
|
|
- ok;
|
|
|
|
|
-post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) ->
|
|
|
|
|
- %% N.B.: all bridges must use the same field name (`connector`) to define the
|
|
|
|
|
- %% connector name.
|
|
|
|
|
- ConnectorName = maps:get(connector, NewConf),
|
|
|
|
|
- case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
|
|
|
|
|
- ok ->
|
|
|
|
|
- ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
|
|
|
|
- Bridges = emqx_utils_maps:deep_put(
|
|
|
|
|
- [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
|
|
|
|
- ),
|
|
|
|
|
- reload_message_publish_hook(Bridges),
|
|
|
|
|
- ?tp(bridge_post_config_update_done, #{}),
|
|
|
|
|
- ok;
|
|
|
|
|
- {error, Error} ->
|
|
|
|
|
- {error, Error}
|
|
|
|
|
- end;
|
|
|
|
|
-post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) ->
|
|
|
|
|
- ConnectorName = maps:get(connector, NewConf),
|
|
|
|
|
- case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
|
|
|
|
|
- ok ->
|
|
|
|
|
- ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf),
|
|
|
|
|
- ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
|
|
|
|
|
- Bridges = emqx_utils_maps:deep_put(
|
|
|
|
|
- [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
|
|
|
|
|
- ),
|
|
|
|
|
- reload_message_publish_hook(Bridges),
|
|
|
|
|
- ?tp(bridge_post_config_update_done, #{}),
|
|
|
|
|
- ok;
|
|
|
|
|
- {error, Error} ->
|
|
|
|
|
- {error, Error}
|
|
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
diff_confs(NewConfs, OldConfs) ->
|
|
diff_confs(NewConfs, OldConfs) ->
|
|
@@ -1600,7 +1606,9 @@ to_connector(ConnectorNameBin, BridgeType) ->
|
|
|
throw(not_found)
|
|
throw(not_found)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-multi_validate_referenced_connectors(Configs) ->
|
|
|
|
|
|
|
+multi_validate_referenced_connectors(false, _Configs) ->
|
|
|
|
|
+ ok;
|
|
|
|
|
+multi_validate_referenced_connectors(true, Configs) ->
|
|
|
Pipeline =
|
|
Pipeline =
|
|
|
lists:map(
|
|
lists:map(
|
|
|
fun({Type, BridgeName, #{connector := ConnectorName}}) ->
|
|
fun({Type, BridgeName, #{connector := ConnectorName}}) ->
|