Просмотр исходного кода

feat(connector): add API for /connectors_test

Shawn 4 лет назад
Родитель
Сommit
9c93ea0338

+ 4 - 4
apps/emqx/src/emqx_authentication.erl

@@ -79,8 +79,8 @@
         ]).
 
 %% proxy callback
--export([ pre_config_update/2
-        , post_config_update/4
+-export([ pre_config_update/3
+        , post_config_update/5
         ]).
 
 -export_type([ authenticator_id/0
@@ -238,10 +238,10 @@ get_enabled(Authenticators) ->
 %% APIs
 %%------------------------------------------------------------------------------
 
-pre_config_update(UpdateReq, OldConfig) ->
+pre_config_update(_, UpdateReq, OldConfig) ->
     emqx_authentication_config:pre_config_update(UpdateReq, OldConfig).
 
-post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
+post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
     emqx_authentication_config:post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs).
 
 %% @doc Get all registered authentication providers.

+ 6 - 6
apps/emqx/src/emqx_authentication_config.erl

@@ -19,8 +19,8 @@
 
 -behaviour(emqx_config_handler).
 
--export([ pre_config_update/2
-        , post_config_update/4
+-export([ pre_config_update/3
+        , post_config_update/5
         ]).
 
 -export([ authenticator_id/1
@@ -53,9 +53,9 @@
 %% Callbacks of config handler
 %%------------------------------------------------------------------------------
 
--spec pre_config_update(update_request(), emqx_config:raw_config())
+-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config())
     -> {ok, map() | list()} | {error, term()}.
-pre_config_update(UpdateReq, OldConfig) ->
+pre_config_update(_, UpdateReq, OldConfig) ->
     try do_pre_config_update(UpdateReq, to_list(OldConfig)) of
         {error, Reason} -> {error, Reason};
         {ok, NewConfig} -> {ok, return_map(NewConfig)}
@@ -102,9 +102,9 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
             end
     end.
 
--spec post_config_update(update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
+-spec post_config_update(list(atom()), update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
     -> ok | {ok, map()} | {error, term()}.
-post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
+post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
     do_post_config_update(UpdateReq, check_configs(to_list(NewConfig)), OldConfig, AppEnvs).
 
 do_post_config_update({create_authenticator, ChainName, Config}, _NewConfig, _OldConfig, _AppEnvs) ->

+ 36 - 22
apps/emqx/src/emqx_config_handler.erl

@@ -45,14 +45,14 @@
 -type handler_name() :: module().
 -type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
 
--optional_callbacks([ pre_config_update/2
-                    , post_config_update/4
+-optional_callbacks([ pre_config_update/3
+                    , post_config_update/5
                     ]).
 
--callback pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) ->
+-callback pre_config_update([atom()], emqx_config:update_request(), emqx_config:raw_config()) ->
     {ok, emqx_config:update_request()} | {error, term()}.
 
--callback post_config_update(emqx_config:update_request(), emqx_config:config(),
+-callback post_config_update([atom()], emqx_config:update_request(), emqx_config:config(),
     emqx_config:config(), emqx_config:app_envs()) ->
         ok | {ok, Result::any()} | {error, Reason::term()}.
 
@@ -181,14 +181,20 @@ process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
         Error -> Error
     end.
 
-do_update_config([], Handlers, OldRawConf, UpdateReq) ->
-    call_pre_config_update(Handlers, OldRawConf, UpdateReq);
-do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) ->
+do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) ->
+    do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, []).
+
+do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
+    call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath);
+do_update_config([ConfKey | SubConfKeyPath], Handlers, OldRawConf,
+        UpdateReq, ConfKeyPath0) ->
+    ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
     SubOldRawConf = get_sub_config(bin(ConfKey), OldRawConf),
     SubHandlers = get_sub_handlers(ConfKey, Handlers),
-    case do_update_config(ConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq) of
+    case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
         {ok, NewUpdateReq} ->
-            call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq});
+            call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq},
+                ConfKeyPath);
         Error ->
             Error
     end.
@@ -211,18 +217,25 @@ check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, Override
         Error -> Error
     end.
 
-do_post_config_update([], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
-    call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs), Result);
-do_post_config_update([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs,
-        Result) ->
+do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
+    do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs,
+        Result, []).
+
+do_post_config_update([], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result,
+        ConfKeyPath) ->
+    call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs),
+        Result, ConfKeyPath);
+do_post_config_update([ConfKey | SubConfKeyPath], Handlers, OldConf, NewConf, AppEnvs,
+        UpdateArgs, Result, ConfKeyPath0) ->
+    ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
     SubOldConf = get_sub_config(ConfKey, OldConf),
     SubNewConf = get_sub_config(ConfKey, NewConf),
     SubHandlers = get_sub_handlers(ConfKey, Handlers),
-    case do_post_config_update(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf, AppEnvs,
-            UpdateArgs, Result) of
+    case do_post_config_update(SubConfKeyPath, SubHandlers, SubOldConf, SubNewConf, AppEnvs,
+            UpdateArgs, Result, ConfKeyPath) of
         {ok, Result1} ->
             call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs),
-                Result1);
+                Result1, ConfKeyPath);
         Error -> Error
     end.
 
@@ -237,22 +250,23 @@ get_sub_config(ConfKey, Conf) when is_map(Conf) ->
 get_sub_config(_, _Conf) -> %% the Conf is a primitive
     undefined.
 
-call_pre_config_update(Handlers, OldRawConf, UpdateReq) ->
+call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
     HandlerName = maps:get(?MOD, Handlers, undefined),
-    case erlang:function_exported(HandlerName, pre_config_update, 2) of
+    case erlang:function_exported(HandlerName, pre_config_update, 3) of
         true ->
-            case HandlerName:pre_config_update(UpdateReq, OldRawConf) of
+            case HandlerName:pre_config_update(ConfKeyPath, UpdateReq, OldRawConf) of
                 {ok, NewUpdateReq} -> {ok, NewUpdateReq};
                 {error, Reason} -> {error, {pre_config_update, HandlerName, Reason}}
             end;
         false -> merge_to_old_config(UpdateReq, OldRawConf)
     end.
 
-call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result) ->
+call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result, ConfKeyPath) ->
     HandlerName = maps:get(?MOD, Handlers, undefined),
-    case erlang:function_exported(HandlerName, post_config_update, 4) of
+    case erlang:function_exported(HandlerName, post_config_update, 5) of
         true ->
-            case HandlerName:post_config_update(UpdateReq, NewConf, OldConf, AppEnvs) of
+            case HandlerName:post_config_update(ConfKeyPath, UpdateReq, NewConf, OldConf,
+                    AppEnvs) of
                 ok -> {ok, Result};
                 {ok, Result1} ->
                     {ok, Result#{HandlerName => Result1}};

+ 2 - 2
apps/emqx/src/emqx_listeners.erl

@@ -46,7 +46,7 @@
         , parse_listener_id/1
         ]).
 
--export([post_config_update/4]).
+-export([post_config_update/5]).
 
 -define(CONF_KEY_PATH, [listeners]).
 -define(TYPES_STRING, ["tcp","ssl","ws","wss","quic"]).
@@ -272,7 +272,7 @@ delete_authentication(Type, ListenerName, _Conf) ->
     emqx_authentication:delete_chain(listener_id(Type, ListenerName)).
 
 %% Update the listeners at runtime
-post_config_update(_Req, NewListeners, OldListeners, _AppEnvs) ->
+post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
     #{added := Added, removed := Removed, changed := Updated}
         = diff_listeners(NewListeners, OldListeners),
     perform_listener_changes(fun stop_listener/3, Removed),

+ 2 - 2
apps/emqx/src/emqx_logger.erl

@@ -70,7 +70,7 @@
         , stop_log_handler/1
         ]).
 
--export([post_config_update/4]).
+-export([post_config_update/5]).
 
 -type(peername_str() :: list()).
 -type(logger_dst() :: file:filename() | console | unknown).
@@ -123,7 +123,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 %% emqx_config_handler callbacks
 %%--------------------------------------------------------------------
-post_config_update(_Req, _NewConf, _OldConf, AppEnvs) ->
+post_config_update(_, _Req, _NewConf, _OldConf, AppEnvs) ->
     gen_server:call(?MODULE, {update_config, AppEnvs}, 5000).
 
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx/src/emqx_schema.erl

@@ -1326,7 +1326,7 @@ to_bar_separated_list(Str) ->
     {ok, string:tokens(Str, "| ")}.
 
 to_ip_port(Str) ->
-    case string:tokens(Str, ":") of
+    case string:tokens(Str, ": ") of
         [Ip, Port] ->
             PortVal = list_to_integer(Port),
             case inet:parse_address(Ip) of
@@ -1377,7 +1377,7 @@ validate_alarm_actions(Actions) ->
     end.
 
 parse_user_lookup_fun(StrConf) ->
-    [ModStr, FunStr] = string:tokens(str(StrConf), ":"),
+    [ModStr, FunStr] = string:tokens(str(StrConf), ": "),
     Mod = list_to_atom(ModStr),
     Fun = list_to_atom(FunStr),
     {fun Mod:Fun/3, undefined}.

+ 4 - 4
apps/emqx_authz/src/emqx_authz.erl

@@ -36,7 +36,7 @@
         , authorize/5
         ]).
 
--export([post_config_update/4, pre_config_update/2]).
+-export([post_config_update/5, pre_config_update/3]).
 
 -export([acl_conf_file/0]).
 
@@ -127,13 +127,13 @@ do_update({_, Sources}, _Conf) when is_list(Sources)->
 do_update({Op, Sources}, Conf) ->
     error({bad_request, #{op => Op, sources => Sources, conf => Conf}}).
 
-pre_config_update(Cmd, Conf) ->
+pre_config_update(_, Cmd, Conf) ->
     {ok, do_update(Cmd, Conf)}.
 
 
-post_config_update(_, undefined, _Conf, _AppEnvs) ->
+post_config_update(_, _, undefined, _Conf, _AppEnvs) ->
     ok;
-post_config_update(Cmd, NewSources, _OldSource, _AppEnvs) ->
+post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) ->
     ok = do_post_update(Cmd, NewSources),
     ok = emqx_authz_cache:drain_cache().
 

+ 4 - 5
apps/emqx_bridge/etc/emqx_bridge.conf

@@ -4,7 +4,7 @@
 
 ## MQTT bridges to/from another MQTT broker
 #bridges.mqtt.my_ingress_mqtt_bridge {
-#    connector = my_mqtt_connector
+#    connector = "mqtt:my_mqtt_connector"
 #    direction = ingress
 #    ## topic mappings for this bridge
 #    from_remote_topic = "aws/#"
@@ -13,11 +13,10 @@
 #    payload = "${payload}"
 #    qos = "${qos}"
 #    retain = "${retain}"
-#
 #}
 #
 #bridges.mqtt.my_egress_mqtt_bridge {
-#    connector = my_mqtt_connector
+#    connector = "mqtt:my_mqtt_connector"
 #    direction = egress
 #    ## topic mappings for this bridge
 #    from_local_topic = "emqx/#"
@@ -26,10 +25,10 @@
 #    qos = 1
 #    retain = false
 #}
-
+#
 ## HTTP bridges to an HTTP server
 #bridges.http.my_http_bridge {
-#    ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
+#    ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
 #    url = "http://localhost:9901/messages/${topic}"
 #    request_timeout = "30s"
 #    connect_timeout = "30s"

+ 37 - 7
apps/emqx_bridge/src/emqx_bridge.erl

@@ -18,7 +18,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 
--export([post_config_update/4]).
+-export([post_config_update/5]).
 
 -export([ load_hook/0
         , reload_hook/0
@@ -39,6 +39,9 @@
         , lookup/3
         , list/0
         , create/3
+        , recreate/2
+        , recreate/3
+        , create_dry_run/2
         , remove/3
         , update/3
         , start/2
@@ -90,13 +93,15 @@ send_message(BridgeId, Message) ->
 config_key_path() ->
     [bridges].
 
+resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
 resource_type(mqtt) -> emqx_connector_mqtt;
+resource_type(<<"http">>) -> emqx_connector_http;
 resource_type(http) -> emqx_connector_http.
 
 bridge_type(emqx_connector_mqtt) -> mqtt;
 bridge_type(emqx_connector_http) -> http.
 
-post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
+post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
     #{added := Added, removed := Removed, changed := Updated}
         = diff_confs(NewConf, OldConf),
     Result = perform_bridge_changes([
@@ -179,7 +184,7 @@ create(Type, Name, Conf) ->
     ?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
         config => Conf}),
     ResId = resource_id(Type, Name),
-    case emqx_resource:create(ResId,
+    case emqx_resource:create_local(ResId,
             emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
         {ok, already_created} ->
             emqx_resource:get_instance(ResId);
@@ -200,12 +205,27 @@ update(Type, Name, {_OldConf, Conf}) ->
     %%
     ?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
         config => Conf}),
-    emqx_resource:recreate(resource_id(Type, Name),
+    recreate(Type, Name, Conf).
+
+recreate(Type, Name) ->
+    recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
+
+recreate(Type, Name, Conf) ->
+    emqx_resource:recreate_local(resource_id(Type, Name),
         emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
 
+create_dry_run(Type, Conf) ->
+    Conf0 = Conf#{<<"ingress">> => #{<<"from_remote_topic">> => <<"t">>}},
+    case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
+        {ok, Conf1} ->
+            emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1);
+        {error, _} = Error ->
+            Error
+    end.
+
 remove(Type, Name, _Conf) ->
     ?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
-    case emqx_resource:remove(resource_id(Type, Name)) of
+    case emqx_resource:remove_local(resource_id(Type, Name)) of
         ok -> ok;
         {error, not_found} -> ok;
         {error, Reason} ->
@@ -264,8 +284,18 @@ parse_confs(http, _Name,
              , request_timeout => ReqTimeout
              }
          };
-parse_confs(Type, Name, #{connector := ConnName, direction := Direction} = Conf) ->
-    ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
+parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
+        when is_binary(ConnId) ->
+    case emqx_connector:parse_connector_id(ConnId) of
+        {Type, ConnName} ->
+            ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
+            make_resource_confs(Direction, ConnectorConfs,
+                maps:without([connector, direction], Conf), Name);
+        {_ConnType, _ConnName} ->
+            error({cannot_use_connector_with_different_type, ConnId})
+    end;
+parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
+        when is_map(ConnectorConfs) ->
     make_resource_confs(Direction, ConnectorConfs,
         maps:without([connector, direction], Conf), Name).
 

+ 52 - 53
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -22,8 +22,8 @@
 -export([ list_create_bridges_in_cluster/2
         , list_local_bridges/1
         , crud_bridges_in_cluster/2
-        , crud_local_bridges/4
         , manage_bridges/2
+        , lookup_from_local_node/2
         ]).
 
 -define(TYPES, [mqtt, http]).
@@ -220,7 +220,15 @@ param_path_operation()->
     }.
 
 list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
-    crud_bridges_in_cluster(post, Id, maps:remove(<<"id">>, Conf));
+    ?TRY_PARSE_ID(Id,
+        case emqx_bridge:lookup(BridgeType, BridgeName) of
+            {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
+            {error, not_found} ->
+                case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of
+                    ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201);
+                    {error, Error} -> {400, Error}
+                end
+        end);
 list_create_bridges_in_cluster(get, _Params) ->
     {200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
 
@@ -229,67 +237,46 @@ list_local_bridges(Node) when Node =:= node() ->
 list_local_bridges(Node) ->
     rpc_call(Node, list_local_bridges, [Node]).
 
-crud_bridges_in_cluster(Method, #{bindings := #{id := Id}, body := Body}) ->
-    crud_bridges_in_cluster(Method, Id, Body).
-
-crud_bridges_in_cluster(Method, Id, Body) ->
-    Results = [crud_local_bridges(Node, Method, Id, Body) || Node <- mria_mnesia:running_nodes()],
-    Filter = fun ({200}) -> false;
-                 ({Code, _}) when Code == 200; Code == 201 -> false;
-                 (_) -> true
-             end,
-    case lists:filter(Filter, Results) of
-        [] ->
-            case Results of
-                [{200} | _] -> {200};
-                [{Code, _} | _] when Code == 200; Code == 201 ->
-                    {Code, format_bridge_info([Bridge || {_, Bridge} <- Results])}
-            end;
-        Errors ->
-            hd(Errors)
-    end.
-
-crud_local_bridges(Node, Method, Id, Body) when Node =/= node() ->
-    rpc_call(Node, crud_local_bridges, [Node, Method, Id, Body]);
-
-crud_local_bridges(_, get, Id, _Body) ->
-    ?TRY_PARSE_ID(Id, case emqx_bridge:lookup(BridgeType, BridgeName) of
-        {ok, Data} -> {200, format_resp(Data)};
-        {error, not_found} ->
-            {404, #{code => 102, message => <<"not_found: ", Id/binary>>}}
-    end);
+crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) ->
+    ?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200));
 
-crud_local_bridges(_, post, Id, Conf) ->
-    ?TRY_PARSE_ID(Id,
-        case emqx_bridge:lookup(BridgeType, BridgeName) of
-            {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
-            {error, not_found} ->
-                case ensure_bridge(Id, BridgeType, BridgeName, Conf) of
-                    {ok, Resp} -> {201, Resp};
-                    {error, Error} -> {400, Error}
-                end
-        end);
-
-crud_local_bridges(_, put, Id, Conf) ->
+crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) ->
     ?TRY_PARSE_ID(Id,
         case emqx_bridge:lookup(BridgeType, BridgeName) of
             {ok, _} ->
-                case ensure_bridge(Id, BridgeType, BridgeName, Conf) of
-                    {ok, Resp} -> {200, Resp};
+                case ensure_bridge(BridgeType, BridgeName, Conf) of
+                    ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200);
                     {error, Error} -> {400, Error}
                 end;
             {error, not_found} ->
                 {404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
         end);
 
-crud_local_bridges(_, delete, Id, _Body) ->
+crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(Id,
-        case emqx:remove_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName]) of
+        case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
+                #{override_to => cluster}) of
             {ok, _} -> {204};
             {error, Reason} ->
                 {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
         end).
 
+lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
+    case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
+        {ok, [{ok, _} | _] = Results} ->
+            {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
+        {ok, [{error, not_found} | _]} ->
+            {404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)};
+        {error, ErrL} ->
+            {500, error_msg('UNKNOWN_ERROR', ErrL)}
+    end.
+
+lookup_from_local_node(BridgeType, BridgeName) ->
+    case emqx_bridge:lookup(BridgeType, BridgeName) of
+        {ok, Res} -> {ok, format_resp(Res)};
+        Error -> Error
+    end.
+
 manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
     OperFun =
         fun (<<"start">>) -> start;
@@ -304,13 +291,12 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
                 {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
         end).
 
-ensure_bridge(Id, BridgeType, BridgeName, Conf) ->
-    case emqx:update_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
-            #{rawconf_with_defaults => true}) of
-        {ok, #{raw_config := RawConf, post_config_update := #{emqx_bridge := Data}}} ->
-            {ok, format_resp(#{id => Id, raw_config => RawConf, resource_data => Data})};
+ensure_bridge(BridgeType, BridgeName, Conf) ->
+    case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
+            #{override_to => cluster}) of
+        {ok, _} -> ok;
         {error, Reason} ->
-            {error, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
+            {error, error_msg('BAD_ARG', Reason)}
     end.
 
 zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
@@ -368,6 +354,14 @@ format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, st
         metrics => ?METRICS(0,0,0,0,0)
     }.
 
+rpc_multicall(Func, Args) ->
+    Nodes = mria_mnesia:running_nodes(),
+    ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),
+    case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
+        [] -> {ok, [Res || {ok, Res} <- ResL]};
+        ErrL -> {error, ErrL}
+    end.
+
 rpc_call(Node, Fun, Args) ->
     rpc_call(Node, ?MODULE, Fun, Args).
 
@@ -378,3 +372,8 @@ rpc_call(Node, Mod, Fun, Args) ->
         {badrpc, Reason} -> {error, Reason};
         Res -> Res
     end.
+
+error_msg(Code, Msg) when is_binary(Msg) ->
+    #{code => Code, message => Msg};
+error_msg(Code, Msg) ->
+    #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -128,7 +128,7 @@ t_http_crud_apis(_) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
-    %% then we add a http bridge, using PUT
+    %% then we add a http bridge, using POST
     %% POST /bridges/ will create a bridge
     URL1 = ?URL(Port, "path1"),
     {ok, 201, Bridge} = request(post, uri(["bridges"]),

+ 4 - 1
apps/emqx_conf/src/emqx_conf.erl

@@ -81,7 +81,10 @@ get_node_and_config(KeyPath) ->
     {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
 update(KeyPath, UpdateReq, Opts0) ->
     Args = [KeyPath, UpdateReq, Opts0],
-    multicall(emqx, update_config, Args).
+    case multicall(emqx, update_config, Args) of
+        {ok, _TnxId, Res} -> Res;
+        {error, Res} -> Res
+    end.
 
 %% @doc Update the specified node's key path in local-override.conf.
 -spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(),

+ 33 - 0
apps/emqx_connector/src/emqx_connector.erl

@@ -24,15 +24,45 @@
 -export([ list/0
         , lookup/1
         , lookup/2
+        , create_dry_run/2
         , update/2
         , update/3
         , delete/1
         , delete/2
         ]).
 
+-export([ post_config_update/5
+        ]).
+
 config_key_path() ->
     [connectors].
 
+post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
+    ConnId = connector_id(Type, Name),
+    LinkedBridgeIds = lists:foldl(fun
+        (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc)
+                when ConnId0 == ConnId ->
+            [BId | Acc];
+        (_, Acc) -> Acc
+    end, [], emqx_bridge:list()),
+    case LinkedBridgeIds of
+        [] -> ok;
+        _ -> {error, {dependency_bridges_exist, LinkedBridgeIds}}
+    end;
+post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) ->
+    ConnId = connector_id(Type, Name),
+    lists:foreach(fun
+        (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId ->
+            {BType, BName} = emqx_bridge:parse_bridge_id(BId),
+            BridgeConf = emqx:get_config([bridges, BType, BName]),
+            case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of
+                {ok, _} -> ok;
+                {error, Reason} -> error({update_bridge_error, Reason})
+            end;
+        (_) ->
+            ok
+    end, emqx_bridge:list()).
+
 connector_id(Type0, Name0) ->
     Type = bin(Type0),
     Name = bin(Name0),
@@ -62,6 +92,9 @@ lookup(Type, Name) ->
         Conf -> {ok, Conf#{<<"id">> => Id}}
     end.
 
+create_dry_run(Type, Conf) ->
+    emqx_bridge:create_dry_run(Type, Conf).
+
 update(Id, Conf) when is_binary(Id) ->
     {Type, Name} = parse_connector_id(Id),
     update(Type, Name, Conf).

+ 34 - 5
apps/emqx_connector/src/emqx_connector_api.erl

@@ -28,11 +28,13 @@
 -export([api_spec/0, paths/0, schema/1, namespace/0]).
 
 %% API callbacks
--export(['/connectors'/2, '/connectors/:id'/2]).
+-export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]).
 
 -define(TRY_PARSE_ID(ID, EXPR),
     try emqx_connector:parse_connector_id(Id) of
-        {ConnType, ConnName} -> EXPR
+        {ConnType, ConnName} ->
+            _ = ConnName,
+            EXPR
     catch
         error:{invalid_bridge_id, Id0} ->
             {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
@@ -44,7 +46,7 @@ namespace() -> "connector".
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
 
-paths() -> ["/connectors", "/connectors/:id"].
+paths() -> ["/connectors_test", "/connectors", "/connectors/:id"].
 
 error_schema(Code, Message) ->
     [ {code, mk(string(), #{example => Code})}
@@ -55,6 +57,10 @@ connector_info() ->
     hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info")
                   ]).
 
+connector_test_info() ->
+    hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_test_info")
+                  ]).
+
 connector_req() ->
     hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector")
                   ]).
@@ -62,6 +68,22 @@ connector_req() ->
 param_path_id() ->
     [{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}].
 
+schema("/connectors_test") ->
+    #{
+        operationId => '/connectors_test',
+        post => #{
+            tags => [<<"connectors">>],
+            description => <<"Test creating a new connector by given Id <br>"
+                             "The Id must be of format <type>:<name>">>,
+            summary => <<"Test creating connector">>,
+            requestBody => connector_test_info(),
+            responses => #{
+                200 => <<"Test connector OK">>,
+                400 => error_schema('TEST_FAILED', "connector test failed")
+            }
+        }
+    };
+
 schema("/connectors") ->
     #{
         operationId => '/connectors',
@@ -116,11 +138,18 @@ schema("/connectors/:id") ->
             summary => <<"Delete connector">>,
             parameters => param_path_id(),
             responses => #{
-                200 => <<"Delete connector successfully">>,
+                204 => <<"Delete connector successfully">>,
                 400 => error_schema('DELETE_FAIL', "Delete failed")
             }}
     }.
 
+'/connectors_test'(post, #{body := #{<<"bridge_type">> := ConnType} = Params}) ->
+    case emqx_connector:create_dry_run(ConnType, maps:remove(<<"bridge_type">>, Params)) of
+        ok -> {200};
+        {error, Error} ->
+            {400, error_msg('BAD_ARG', Error)}
+    end.
+
 '/connectors'(get, _Request) ->
     {200, emqx_connector:list()};
 
@@ -161,7 +190,7 @@ schema("/connectors/:id") ->
         case emqx_connector:lookup(ConnType, ConnName) of
             {ok, _} ->
                 case emqx_connector:delete(ConnType, ConnName) of
-                    {ok, _} -> {200};
+                    {ok, _} -> {204};
                     {error, Error} -> {400, error_msg('BAD_ARG', Error)}
                 end;
             {error, not_found} ->

+ 4 - 0
apps/emqx_connector/src/emqx_connector_app.erl

@@ -20,11 +20,15 @@
 
 -export([start/2, stop/1]).
 
+-define(CONF_HDLR_PATH, (emqx_connector:config_key_path() ++ ['?', '?'])).
+
 start(_StartType, _StartArgs) ->
+    ok = emqx_config_handler:add_handler(?CONF_HDLR_PATH, emqx_connector),
     emqx_connector_mqtt_worker:register_metrics(),
     emqx_connector_sup:start_link().
 
 stop(_State) ->
+    emqx_config_handler:remove_handler(?CONF_HDLR_PATH),
     ok.
 
 %% internal functions

+ 4 - 0
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -138,6 +138,8 @@ on_health_check(_InstId, #{name := InstanceId} = State) ->
         _ -> {error, {connector_down, InstanceId}, State}
     end.
 
+make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
+    undefined;
 make_sub_confs(undefined) ->
     undefined;
 make_sub_confs(SubRemoteConf) ->
@@ -148,6 +150,8 @@ make_sub_confs(SubRemoteConf) ->
             SubConf#{on_message_received => MFA}
     end.
 
+make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
+    undefined;
 make_forward_confs(undefined) ->
     undefined;
 make_forward_confs(FrowardConf) ->

+ 4 - 0
apps/emqx_connector/src/emqx_connector_schema.erl

@@ -25,6 +25,10 @@ fields("mqtt_connector") ->
 
 fields("mqtt_connector_info") ->
     [{id, sc(binary(), #{desc => "The connector Id"})}]
+    ++ fields("mqtt_connector");
+
+fields("mqtt_connector_test_info") ->
+    [{bridge_type, sc(mqtt, #{desc => "The Bridge Type"})}]
     ++ fields("mqtt_connector").
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).

+ 2 - 2
apps/emqx_connector/src/emqx_connector_schema_lib.erl

@@ -105,7 +105,7 @@ servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be em
 servers(_) -> undefined.
 
 to_ip_port(Str) ->
-     case string:tokens(Str, ":") of
+     case string:tokens(Str, ": ") of
          [Ip, Port] ->
              case inet:parse_address(Ip) of
                  {ok, R} -> {ok, {R, list_to_integer(Port)}};
@@ -121,7 +121,7 @@ ip_port_to_string({Ip, Port}) when is_tuple(Ip) ->
 
 to_servers(Str) ->
     {ok, lists:map(fun(Server) ->
-             case string:tokens(Server, ":") of
+             case string:tokens(Server, ": ") of
                  [Ip] ->
                      [{host, Ip}];
                  [Ip, Port] ->

+ 308 - 0
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -0,0 +1,308 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connector_api_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include("emqx/include/emqx.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(CONF_DEFAULT, <<"connectors: {}">>).
+-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
+-define(CONNECTR_ID, <<"mqtt:test_connector">>).
+-define(BRIDGE_ID, <<"mqtt:test_bridge">>).
+-define(MQTT_CONNECOTR(Username),
+#{
+    <<"server">> => <<"127.0.0.1:1883">>,
+    <<"username">> => Username,
+    <<"password">> => <<"">>,
+    <<"proto_ver">> => <<"v4">>,
+    <<"ssl">> => #{<<"enable">> => false}
+}).
+-define(MQTT_CONNECOTR2(Server),
+    ?MQTT_CONNECOTR(<<"user1">>)#{<<"server">> => Server}).
+
+-define(MQTT_BRIDGE(ID),
+#{
+    <<"connector">> => ID,
+    <<"direction">> => <<"ingress">>,
+    <<"from_remote_topic">> => <<"remote_topic/#">>,
+    <<"to_local_topic">> => <<"local_topic/${topic}">>,
+    <<"subscribe_qos">> => 1,
+    <<"payload">> => <<"${payload}">>,
+    <<"qos">> => <<"${qos}">>,
+    <<"retain">> => <<"${retain}">>
+}).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+    [].
+
+suite() ->
+	[{timetrap,{seconds,30}}].
+
+init_per_suite(Config) ->
+    ok = emqx_config:put([emqx_dashboard], #{
+        default_username => <<"admin">>,
+        default_password => <<"public">>,
+        listeners => [#{
+            protocol => http,
+            port => 18083
+        }]
+    }),
+    ok = application:load(emqx_conf),
+    ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
+    ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT),
+    ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
+    Config.
+
+end_per_suite(_Config) ->
+    ok = ekka:stop(),
+    emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
+    ok.
+
+init_per_testcase(_, Config) ->
+    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+    Config.
+end_per_testcase(_, _Config) ->
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_mqtt_crud_apis(_) ->
+    %% assert we there's no connectors at first
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+
+    %% then we add a mqtt connector, using POST
+    %% POST /connectors/ will create a connector
+    User1 = <<"user1">>,
+    {ok, 201, Connector} = request(post, uri(["connectors"]),
+                                ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+
+    %ct:pal("---connector: ~p", [Connector]),
+    ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+                  , <<"server">> := <<"127.0.0.1:1883">>
+                  , <<"username">> := User1
+                  , <<"password">> := <<"">>
+                  , <<"proto_ver">> := <<"v4">>
+                  , <<"ssl">> := #{<<"enable">> := false}
+                  }, jsx:decode(Connector)),
+
+    %% create a again returns an error
+    {ok, 400, RetMsg} = request(post, uri(["connectors"]),
+                                ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+    ?assertMatch(
+        #{ <<"code">> := _
+         , <<"message">> := <<"connector already exists">>
+         }, jsx:decode(RetMsg)),
+
+    %% update the request-path of the connector
+    User2 = <<"user2">>,
+    {ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]),
+                                 ?MQTT_CONNECOTR(User2)),
+    ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+                  , <<"server">> := <<"127.0.0.1:1883">>
+                  , <<"username">> := User2
+                  , <<"password">> := <<"">>
+                  , <<"proto_ver">> := <<"v4">>
+                  , <<"ssl">> := #{<<"enable">> := false}
+                  }, jsx:decode(Connector2)),
+
+    %% list all connectors again, assert Connector2 is in it
+    {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
+    ?assertMatch([#{ <<"id">> := ?CONNECTR_ID
+                   , <<"server">> := <<"127.0.0.1:1883">>
+                   , <<"username">> := User2
+                   , <<"password">> := <<"">>
+                   , <<"proto_ver">> := <<"v4">>
+                   , <<"ssl">> := #{<<"enable">> := false}
+                   }], jsx:decode(Connector2Str)),
+
+    %% get the connector by id
+    {ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
+    ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+                  , <<"server">> := <<"127.0.0.1:1883">>
+                  , <<"username">> := User2
+                  , <<"password">> := <<"">>
+                  , <<"proto_ver">> := <<"v4">>
+                  , <<"ssl">> := #{<<"enable">> := false}
+                  }, jsx:decode(Connector3Str)),
+
+    %% delete the connector
+    {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+
+    %% update a deleted connector returns an error
+    {ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]),
+                                 ?MQTT_CONNECOTR(User2)),
+    ?assertMatch(
+        #{ <<"code">> := _
+         , <<"message">> := <<"connector not found">>
+         }, jsx:decode(ErrMsg2)),
+    ok.
+
+t_mqtt_conn_bridge(_) ->
+    %% assert we there's no connectors and no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a mqtt connector, using POST
+    User1 = <<"user1">>,
+    {ok, 201, Connector} = request(post, uri(["connectors"]),
+                                ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+
+    %ct:pal("---connector: ~p", [Connector]),
+    ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+                  , <<"server">> := <<"127.0.0.1:1883">>
+                  , <<"username">> := User1
+                  , <<"password">> := <<"">>
+                  , <<"proto_ver">> := <<"v4">>
+                  , <<"ssl">> := #{<<"enable">> := false}
+                  }, jsx:decode(Connector)),
+
+    %% ... and a MQTT bridge, using POST
+    %% we bind this bridge to the connector created just now
+    {ok, 201, Bridge} = request(post, uri(["bridges"]),
+                                ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
+
+    %ct:pal("---bridge: ~p", [Bridge]),
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+                  , <<"bridge_type">> := <<"mqtt">>
+                  , <<"status">> := <<"connected">>
+                  , <<"connector">> := ?CONNECTR_ID
+                  }, jsx:decode(Bridge)),
+
+    %% we now test if the bridge works as expected
+
+    RemoteTopic = <<"remote_topic/1">>,
+    LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
+    Payload = <<"hello">>,
+    emqx:subscribe(LocalTopic),
+    %% PUBLISH a message to the 'remote' broker, as we have only one broker,
+    %% the remote broker is also the local one.
+    emqx:publish(emqx_message:make(RemoteTopic, Payload)),
+
+    %% we should receive a message on the local broker, with specified topic
+    ?assert(
+        receive
+            {deliver, LocalTopic, #message{payload = Payload}} ->
+                ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
+                true;
+            Msg ->
+                ct:pal("Msg: ~p", [Msg]),
+                false
+        after 100 ->
+            false
+        end),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% delete the connector
+    {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+    ok.
+
+%% t_mqtt_conn_update:
+%% - update a connector should also update all of the the bridges
+%% - cannot delete a connector that is used by at least one bridge
+t_mqtt_conn_update(_) ->
+    %% assert we there's no connectors and no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a mqtt connector, using POST
+    {ok, 201, Connector} = request(post, uri(["connectors"]),
+                                ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"id">> => ?CONNECTR_ID}),
+
+    %ct:pal("---connector: ~p", [Connector]),
+    ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+                  , <<"server">> := <<"127.0.0.1:1883">>
+                  }, jsx:decode(Connector)),
+
+    %% ... and a MQTT bridge, using POST
+    %% we bind this bridge to the connector created just now
+    {ok, 201, Bridge} = request(post, uri(["bridges"]),
+                                ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+                  , <<"bridge_type">> := <<"mqtt">>
+                  , <<"status">> := <<"connected">>
+                  , <<"connector">> := ?CONNECTR_ID
+                  }, jsx:decode(Bridge)),
+
+    %% then we try to update 'server' of the connector, to an unavailable IP address
+    %% the update should fail because of 'unreachable' or 'connrefused'
+    {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]),
+                                 ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)),
+    %% we fix the 'server' parameter to a normal one, it should work
+    {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
+                                 ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% delete the connector
+    {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
+
+t_mqtt_conn_testing(_) ->
+    %% APIs for testing the connectivity
+    %% then we add a mqtt connector, using POST
+    {ok, 200, <<>>} = request(post, uri(["connectors_test"]),
+        ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"bridge_type">> => <<"mqtt">>}),
+    {ok, 400, _} = request(post, uri(["connectors_test"]),
+        ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{<<"bridge_type">> => <<"mqtt">>}).
+
+%%--------------------------------------------------------------------
+%% HTTP Request
+%%--------------------------------------------------------------------
+-define(HOST, "http://127.0.0.1:18083/").
+-define(API_VERSION, "v5").
+-define(BASE_PATH, "api").
+
+request(Method, Url, Body) ->
+    Request = case Body of
+        [] -> {Url, [auth_header_()]};
+        _ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
+    end,
+    ct:pal("Method: ~p, Request: ~p", [Method, Request]),
+    case httpc:request(Method, Request, [], [{body_format, binary}]) of
+        {error, socket_closed_remotely} ->
+            {error, socket_closed_remotely};
+        {ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
+            {ok, Code, Return};
+        {ok, {Reason, _, _}} ->
+            {error, Reason}
+    end.
+
+uri() -> uri([]).
+uri(Parts) when is_list(Parts) ->
+    NParts = [E || E <- Parts],
+    ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
+
+auth_header_() ->
+    Username = <<"admin">>,
+    Password = <<"public">>,
+    {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
+    {"Authorization", "Bearer " ++ binary_to_list(Token)}.
+

+ 22 - 19
apps/emqx_gateway/src/emqx_gateway_conf.erl

@@ -52,8 +52,8 @@
         ]).
 
 %% callbacks for emqx_config_handler
--export([ pre_config_update/2
-        , post_config_update/4
+-export([ pre_config_update/3
+        , post_config_update/5
         ]).
 
 -type atom_or_bin() :: atom() | binary().
@@ -246,10 +246,11 @@ bin(B) when is_binary(B) ->
 %% Config Handler
 %%--------------------------------------------------------------------
 
--spec pre_config_update(emqx_config:update_request(),
+-spec pre_config_update(list(atom()),
+                        emqx_config:update_request(),
                         emqx_config:raw_config()) ->
     {ok, emqx_config:update_request()} | {error, term()}.
-pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
+pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
     case maps:get(GwName, RawConf, undefined) of
         undefined ->
             NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
@@ -257,7 +258,7 @@ pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
         _ ->
             {error, already_exist}
     end;
-pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
+pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
     case maps:get(GwName, RawConf, undefined) of
         undefined ->
             {error, not_found};
@@ -266,14 +267,14 @@ pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
                                   <<"authentication">>], Conf),
             {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
     end;
-pre_config_update({unload_gateway, GwName}, RawConf) ->
+pre_config_update(_, {unload_gateway, GwName}, RawConf) ->
     _ = tune_gw_certs(fun clear_certs/2,
                       GwName,
                       maps:get(GwName, RawConf, #{})
                      ),
     {ok, maps:remove(GwName, RawConf)};
 
-pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
         undefined ->
@@ -285,7 +286,7 @@ pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
         _ ->
             {error, already_exist}
     end;
-pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
         undefined ->
@@ -298,7 +299,7 @@ pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
                    #{GwName => #{<<"listeners">> => NListener}})}
 
     end;
-pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
+pre_config_update(_, {remove_listener, GwName, {LType, LName}}, RawConf) ->
     Path = [GwName, <<"listeners">>, LType, LName],
     case emqx_map_lib:deep_get(Path, RawConf, undefined) of
          undefined ->
@@ -308,7 +309,7 @@ pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
             {ok, emqx_map_lib:deep_remove(Path, RawConf)}
     end;
 
-pre_config_update({add_authn, GwName, Conf}, RawConf) ->
+pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"authentication">>], RawConf, undefined) of
         undefined ->
@@ -318,7 +319,7 @@ pre_config_update({add_authn, GwName, Conf}, RawConf) ->
         _ ->
             {error, already_exist}
     end;
-pre_config_update({add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"listeners">>, LType, LName],
            RawConf, undefined) of
@@ -336,7 +337,7 @@ pre_config_update({add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
                     {error, already_exist}
             end
     end;
-pre_config_update({update_authn, GwName, Conf}, RawConf) ->
+pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"authentication">>], RawConf, undefined) of
         undefined ->
@@ -346,7 +347,7 @@ pre_config_update({update_authn, GwName, Conf}, RawConf) ->
                    RawConf,
                    #{GwName => #{<<"authentication">> => Conf}})}
     end;
-pre_config_update({update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
     case emqx_map_lib:deep_get(
            [GwName, <<"listeners">>, LType, LName],
            RawConf, undefined) of
@@ -368,22 +369,24 @@ pre_config_update({update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
                     {ok, emqx_map_lib:deep_merge(RawConf, NGateway)}
             end
     end;
-pre_config_update({remove_authn, GwName}, RawConf) ->
+pre_config_update(_, {remove_authn, GwName}, RawConf) ->
     {ok, emqx_map_lib:deep_remove(
            [GwName, <<"authentication">>], RawConf)};
-pre_config_update({remove_authn, GwName, {LType, LName}}, RawConf) ->
+pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
     Path = [GwName, <<"listeners">>, LType, LName, <<"authentication">>],
     {ok, emqx_map_lib:deep_remove(Path, RawConf)};
 
-pre_config_update(UnknownReq, _RawConf) ->
+pre_config_update(_, UnknownReq, _RawConf) ->
     logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
     {error, badreq}.
 
--spec post_config_update(emqx_config:update_request(), emqx_config:config(),
+-spec post_config_update(list(atom()),
+                         emqx_config:update_request(),
+                         emqx_config:config(),
                          emqx_config:config(), emqx_config:app_envs())
     -> ok | {ok, Result::any()} | {error, Reason::term()}.
 
-post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
+post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
     [_Tag, GwName0 | _] = tuple_to_list(Req),
     GwName = binary_to_existing_atom(GwName0),
 
@@ -398,7 +401,7 @@ post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
         {New, Old} when is_map(New), is_map(Old) ->
             emqx_gateway:update(GwName, New)
     end;
-post_config_update(_Req, _NewConfig, _OldConfig, _AppEnvs) ->
+post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
     ok.
 
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -84,7 +84,7 @@
         % , inc_counter/3 %% increment the counter by a given integer
         ]).
 
--define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => false}).
+-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
 
 -optional_callbacks([ on_query/4
                     , on_health_check/2

+ 2 - 2
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -25,7 +25,7 @@
 
 -export([start_link/0]).
 
--export([ post_config_update/4
+-export([ post_config_update/5
         , config_key_path/0
         ]).
 
@@ -81,7 +81,7 @@ start_link() ->
 %%------------------------------------------------------------------------------
 %% The config handler for emqx_rule_engine
 %%------------------------------------------------------------------------------
-post_config_update(_Req, NewRules, OldRules, _AppEnvs) ->
+post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
     #{added := Added, removed := Removed, changed := Updated}
         = emqx_map_lib:diff_maps(NewRules, OldRules),
     maps_foreach(fun({Id, {_Old, New}}) ->