|
@@ -101,16 +101,11 @@ do_update({?CMD_APPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
|
|
|
NConf = Conf ++ Sources,
|
|
NConf = Conf ++ Sources,
|
|
|
ok = check_dup_types(NConf),
|
|
ok = check_dup_types(NConf),
|
|
|
NConf;
|
|
NConf;
|
|
|
-do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source},
|
|
|
|
|
- Conf) when is_map(Source), is_list(Conf),
|
|
|
|
|
- Type =:= mongodb;
|
|
|
|
|
- Type =:= mysql;
|
|
|
|
|
- Type =:= postgresql;
|
|
|
|
|
- Type =:= redis ->
|
|
|
|
|
- {_Old, Front, Rear} = take(Type, Conf),
|
|
|
|
|
- [NSource] = check_sources([Source]),
|
|
|
|
|
- case emqx_resource:create_dry_run(connector_module(Type), NSource) of
|
|
|
|
|
|
|
+do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf) when is_map(Source),
|
|
|
|
|
+ is_list(Conf) ->
|
|
|
|
|
+ case create_dry_run(Type, Source) of
|
|
|
ok ->
|
|
ok ->
|
|
|
|
|
+ {_Old, Front, Rear} = take(Type, Conf),
|
|
|
NConf = Front ++ [Source | Rear],
|
|
NConf = Front ++ [Source | Rear],
|
|
|
ok = check_dup_types(NConf),
|
|
ok = check_dup_types(NConf),
|
|
|
NConf;
|
|
NConf;
|
|
@@ -202,6 +197,22 @@ check_dup_types([Source | Sources], Checked) ->
|
|
|
check_dup_types(Sources, [Type | Checked])
|
|
check_dup_types(Sources, [Type | Checked])
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
|
|
+create_dry_run(T, Source) ->
|
|
|
|
|
+ case is_connector_source(T) of
|
|
|
|
|
+ true ->
|
|
|
|
|
+ [NSource] = check_sources([Source]),
|
|
|
|
|
+ emqx_resource:create_dry_run(T, NSource);
|
|
|
|
|
+ false ->
|
|
|
|
|
+ ok
|
|
|
|
|
+end.
|
|
|
|
|
+
|
|
|
|
|
+is_connector_source(http) -> true;
|
|
|
|
|
+is_connector_source(mongodb) -> true;
|
|
|
|
|
+is_connector_source(mysql) -> true;
|
|
|
|
|
+is_connector_source(postgresql) -> true;
|
|
|
|
|
+is_connector_source(redis) -> true;
|
|
|
|
|
+is_connector_source(_) -> false.
|
|
|
|
|
+
|
|
|
init_sources(Sources) ->
|
|
init_sources(Sources) ->
|
|
|
{_Enabled, Disabled} = lists:partition(fun(#{enable := Enable}) -> Enable end, Sources),
|
|
{_Enabled, Disabled} = lists:partition(fun(#{enable := Enable}) -> Enable end, Sources),
|
|
|
case Disabled =/= [] of
|
|
case Disabled =/= [] of
|