소스 검색

Merge pull request #7251 from terry-xiaoyu/add_restart_bridge_api

feat(bridge): add APIs for restart/stop bridges on one node
Xinyu Liu 4 년 전
부모
커밋
67e39150d0
27개의 변경된 파일226개의 추가작업 그리고 98개의 파일을 삭제
  1. 1 1
      apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
  2. 1 1
      apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl
  3. 1 1
      apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl
  4. 1 1
      apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl
  5. 1 1
      apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl
  6. 1 1
      apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl
  7. 1 1
      apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl
  8. 1 1
      apps/emqx_authn/test/emqx_authn_redis_SUITE.erl
  9. 1 1
      apps/emqx_authz/src/emqx_authz_postgresql.erl
  10. 1 1
      apps/emqx_authz/src/emqx_authz_utils.erl
  11. 1 1
      apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
  12. 1 1
      apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl
  13. 1 1
      apps/emqx_authz/test/emqx_authz_redis_SUITE.erl
  14. 5 8
      apps/emqx_bridge/src/emqx_bridge.erl
  15. 83 15
      apps/emqx_bridge/src/emqx_bridge_api.erl
  16. 9 17
      apps/emqx_bridge/src/emqx_bridge_app.erl
  17. 17 2
      apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl
  18. 68 8
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  19. 7 7
      apps/emqx_connector/src/emqx_connector.erl
  20. 6 6
      apps/emqx_connector/src/emqx_connector_api.erl
  21. 2 11
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl
  22. 1 1
      apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl
  23. 1 1
      apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
  24. 1 1
      apps/emqx_connector/test/emqx_connector_redis_SUITE.erl
  25. 2 0
      apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl
  26. 10 7
      apps/emqx_resource/src/emqx_resource_instance.erl
  27. 1 1
      apps/emqx_retainer/src/emqx_retainer.erl

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -134,7 +134,7 @@ create(#{method := Method,
                                     emqx_connector_http,
                                     Config#{base_url => maps:remove(query, URIMap),
                                             pool_type => random},
-                                            #{waiting_connect_complete => 5000}) of
+                                            #{}) of
         {ok, already_created} ->
             {ok, State};
         {ok, _} ->

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -116,7 +116,7 @@ create(#{selector := Selector} = Config) ->
                                     ?RESOURCE_GROUP,
                                     emqx_connector_mongo,
                                     Config,
-                                    #{waiting_connect_complete => 5000}) of
+                                    #{}) of
         {ok, already_created} ->
             {ok, NState};
         {ok, _} ->

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl

@@ -85,7 +85,7 @@ create(#{password_hash_algorithm := Algorithm,
                                     ?RESOURCE_GROUP,
                                     emqx_connector_mysql,
                                     Config,
-                                    #{waiting_connect_complete => 5000}) of
+                                    #{}) of
         {ok, already_created} ->
             {ok, State};
         {ok, _} ->

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl

@@ -81,7 +81,7 @@ create(#{query := Query0,
               resource_id => ResourceId},
     case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql,
                                     Config#{named_queries => #{ResourceId => Query}},
-                                    #{waiting_connect_complete => 5000}) of
+                                    #{}) of
         {ok, already_created} ->
             {ok, State};
         {ok, _} ->

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl

@@ -93,7 +93,7 @@ create(#{cmd := Cmd,
                    resource_id => ResourceId},
         case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP,
                                         emqx_connector_redis, Config,
-                                        #{waiting_connect_complete => 5000}) of
+                                        #{}) of
             {ok, already_created} ->
                 {ok, NState};
             {ok, _} ->

+ 1 - 1
apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl

@@ -63,7 +63,7 @@ init_per_suite(Config) ->
               ?RESOURCE_GROUP,
               emqx_connector_mysql,
               mysql_config(),
-              #{waiting_connect_complete => 5000}),
+              #{}),
             Config;
         false ->
             {skip, no_mysql}

+ 1 - 1
apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl

@@ -64,7 +64,7 @@ init_per_suite(Config) ->
               ?RESOURCE_GROUP,
               emqx_connector_pgsql,
               pgsql_config(),
-              #{waiting_connect_complete => 5000}),
+              #{}),
             Config;
         false ->
             {skip, no_pgsql}

+ 1 - 1
apps/emqx_authn/test/emqx_authn_redis_SUITE.erl

@@ -63,7 +63,7 @@ init_per_suite(Config) ->
               ?RESOURCE_GROUP,
               emqx_connector_redis,
               redis_config(),
-              #{waiting_connect_complete => 5000}),
+              #{}),
             Config;
         false ->
             {skip, no_redis}

+ 1 - 1
apps/emqx_authz/src/emqx_authz_postgresql.erl

@@ -56,7 +56,7 @@ init(#{query := SQL0} = Source) ->
             ?RESOURCE_GROUP,
             emqx_connector_pgsql,
             Source#{named_queries => #{ResourceID => SQL}},
-            #{waiting_connect_complete => 5000}) of
+            #{}) of
         {ok, _} ->
             Source#{annotations =>
                         #{id => ResourceID,

+ 1 - 1
apps/emqx_authz/src/emqx_authz_utils.erl

@@ -38,7 +38,7 @@ create_resource(Module, Config) ->
     case emqx_resource:create_local(ResourceID,
                                     ?RESOURCE_GROUP,
                                     Module, Config,
-                                    #{waiting_connect_complete => 5000}) of
+                                    #{}) of
         {ok, already_created} -> {ok, ResourceID};
         {ok, _} -> {ok, ResourceID};
         {error, Reason} -> {error, Reason}

+ 1 - 1
apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl

@@ -45,7 +45,7 @@ init_per_suite(Config) ->
               ?RESOURCE_GROUP,
               emqx_connector_mysql,
               mysql_config(),
-              #{waiting_connect_complete => 5000}),
+              #{}),
             Config;
         false ->
             {skip, no_mysql}

+ 1 - 1
apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl

@@ -45,7 +45,7 @@ init_per_suite(Config) ->
               ?RESOURCE_GROUP,
               emqx_connector_pgsql,
               pgsql_config(),
-              #{waiting_connect_complete => 5000}),
+              #{}),
             Config;
         false ->
             {skip, no_pgsql}

+ 1 - 1
apps/emqx_authz/test/emqx_authz_redis_SUITE.erl

@@ -46,7 +46,7 @@ init_per_suite(Config) ->
               ?RESOURCE_GROUP,
               emqx_connector_redis,
               redis_config(),
-              #{waiting_connect_complete => 5000}),
+              #{}),
             Config;
         false ->
             {skip, no_redis}

+ 5 - 8
apps/emqx_bridge/src/emqx_bridge.erl

@@ -50,7 +50,6 @@
         , remove/2
         , update/2
         , update/3
-        , start/2
         , stop/2
         , restart/2
         ]).
@@ -208,12 +207,10 @@ lookup(Type, Name, RawConf) ->
                    raw_config => RawConf}}
     end.
 
-start(Type, Name) ->
-    restart(Type, Name).
-
 stop(Type, Name) ->
     emqx_resource:stop(resource_id(Type, Name)).
 
+%% we don't provide 'start', as we want an already started bridge to be restarted.
 restart(Type, Name) ->
     emqx_resource:restart(resource_id(Type, Name)).
 
@@ -228,7 +225,7 @@ create(Type, Name, Conf) ->
                             <<"emqx_bridge">>,
                             emqx_bridge:resource_type(Type),
                             parse_confs(Type, Name, Conf),
-                            #{waiting_connect_complete => 5000}) of
+                            #{}) of
         {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
         {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
         {error, Reason} -> {error, Reason}
@@ -263,8 +260,8 @@ update(Type, Name, {OldConf, Conf}) ->
             %% we don't need to recreate the bridge if this config change is only to
             %% toggole the config 'bridge.{type}.{name}.enable'
             case maps:get(enable, Conf, true) of
-                false -> stop(Type, Name);
-                true -> start(Type, Name)
+                true -> restart(Type, Name);
+                false -> stop(Type, Name)
             end
     end.
 
@@ -275,7 +272,7 @@ recreate(Type, Name, Conf) ->
     emqx_resource:recreate_local(resource_id(Type, Name),
         emqx_bridge:resource_type(Type),
         parse_confs(Type, Name, Conf),
-        #{waiting_connect_complete => 5000}).
+        #{}).
 
 create_dry_run(Type, Conf) ->
     Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},

+ 83 - 15
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -33,6 +33,7 @@
 -export([ '/bridges'/2
         , '/bridges/:id'/2
         , '/bridges/:id/operation/:operation'/2
+        , '/nodes/:node/bridges/:id/operation/:operation'/2
         ]).
 
 -export([ lookup_from_local_node/2
@@ -74,7 +75,8 @@ namespace() -> "bridge".
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
 
-paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation"].
+paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation",
+            "/nodes/:node/bridges/:id/operation/:operation"].
 
 error_schema(Code, Message) when is_atom(Code) ->
     error_schema([Code], Message);
@@ -87,11 +89,28 @@ get_response_body_schema() ->
     emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
         bridge_info_examples(get)).
 
-param_path_operation() ->
-    {operation, mk(enum([start, stop, restart]),
+param_path_operation_cluster() ->
+    {operation, mk(enum([enable, disable, stop, restart]),
         #{ in => path
          , required => true
          , example => <<"start">>
+         , desc => <<"Operations can be one of: enable, disable, start, stop, restart">>
+         })}.
+
+param_path_operation_on_node() ->
+    {operation, mk(enum([stop, restart]),
+        #{ in => path
+         , required => true
+         , example => <<"start">>
+         , desc => <<"Operations can be one of: start, stop, restart">>
+         })}.
+
+param_path_node() ->
+    {node, mk(binary(),
+        #{ in => path
+         , required => true
+         , example => <<"emqx@127.0.0.1">>
+         , desc => <<"The bridge Id. Must be of format {type}:{name}">>
          })}.
 
 param_path_id() ->
@@ -219,7 +238,7 @@ schema("/bridges") ->
                             bridge_info_examples(post)),
             responses => #{
                 201 => get_response_body_schema(),
-                400 => error_schema('BAD_REQUEST', "Create bridge failed")
+                400 => error_schema('ALREADY_EXISTS', "Bridge already exists")
             }
         }
     };
@@ -267,11 +286,32 @@ schema("/bridges/:id/operation/:operation") ->
         'operationId' => '/bridges/:id/operation/:operation',
         post => #{
             tags => [<<"bridges">>],
-            summary => <<"Start/Stop/Restart Bridge">>,
-            description => <<"Start/Stop/Restart bridges on a specific node.">>,
+            summary => <<"Enable/Disable/Stop/Restart Bridge">>,
+            description => <<"Enable/Disable/Stop/Restart bridges on all nodes"
+                " in the cluster.">>,
             parameters => [
                 param_path_id(),
-                param_path_operation()
+                param_path_operation_cluster()
+            ],
+            responses => #{
+                500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
+                200 => <<"Operation success">>
+            }
+        }
+    };
+
+schema("/nodes/:node/bridges/:id/operation/:operation") ->
+    #{
+        'operationId' => '/nodes/:node/bridges/:id/operation/:operation',
+        post => #{
+            tags => [<<"bridges">>],
+            summary => <<"Stop/Restart Bridge">>,
+            description => <<"Stop/Restart bridges on a specific node.\n"
+                "NOTE: It's not allowed to disable/enable bridges on a single node.">>,
+            parameters => [
+                param_path_node(),
+                param_path_id(),
+                param_path_operation_on_node()
             ],
             responses => #{
                 500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
@@ -341,23 +381,51 @@ lookup_from_local_node(BridgeType, BridgeName) ->
 
 '/bridges/:id/operation/:operation'(post, #{bindings :=
         #{id := Id, operation := Op}}) ->
-    ?TRY_PARSE_ID(Id, case operation_to_conf_req(Op) of
+    ?TRY_PARSE_ID(Id, case operation_func(Op) of
         invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
-        UpReq ->
+        OperFunc when OperFunc == enable; OperFunc == disable ->
             case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
-                    {UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of
+                    {OperFunc, BridgeType, BridgeName}, #{override_to => cluster}) of
                 {ok, _} -> {200};
                 {error, {pre_config_update, _, bridge_not_found}} ->
                     {404, error_msg('NOT_FOUND', <<"bridge not found">>)};
                 {error, Reason} ->
                     {500, error_msg('INTERNAL_ERROR', Reason)}
+            end;
+        OperFunc ->
+            Nodes = mria_mnesia:running_nodes(),
+            operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName)
+    end).
+
+'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
+        #{id := Id, operation := Op}}) ->
+    ?TRY_PARSE_ID(Id, case operation_func(Op) of
+        invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
+        OperFunc when OperFunc == restart; OperFunc == stop ->
+            case emqx_bridge:OperFunc(BridgeType, BridgeName) of
+                ok -> {200};
+                {error, Reason} ->
+                    {500, error_msg('INTERNAL_ERROR', Reason)}
             end
     end).
 
-operation_to_conf_req(<<"start">>) -> start;
-operation_to_conf_req(<<"stop">>) -> stop;
-operation_to_conf_req(<<"restart">>) -> restart;
-operation_to_conf_req(_) -> invalid.
+operation_func(<<"stop">>) -> stop;
+operation_func(<<"restart">>) -> restart;
+operation_func(<<"enable">>) -> enable;
+operation_func(<<"disable">>) -> disable;
+operation_func(_) -> invalid.
+
+operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
+    RpcFunc = case OperFunc of
+        restart -> restart_bridges_to_all_nodes;
+        stop -> stop_bridges_to_all_nodes
+    end,
+    case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
+        {ok, _} ->
+            {200};
+        {error, ErrL} ->
+            {500, error_msg('INTERNAL_ERROR', ErrL)}
+    end.
 
 ensure_bridge_created(BridgeType, BridgeName, Conf) ->
     case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
@@ -437,7 +505,7 @@ format_metrics(#{
 
 
 is_ok(ResL) ->
-    case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
+    case lists:filter(fun({ok, _}) -> false; (ok) -> false; (_) -> true end, ResL) of
         [] -> {ok, [Res || {ok, Res} <- ResL]};
         ErrL -> {error, ErrL}
     end.

+ 9 - 17
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -39,24 +39,16 @@ stop(_State) ->
     ok = emqx_bridge:unload_hook(),
     ok.
 
--define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart).
-pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) ->
+%% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the
+%%       underlying resources.
+pre_config_update(_, {_Oper, _, _}, undefined) ->
     {error, bridge_not_found};
-pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) ->
-    case perform_operation(Oper, Type, Name) of
-        ok ->
-            %% we also need to save the 'enable' to the config files
-            {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
-        {error, _} = Err -> Err
-    end;
-pre_config_update(_, Conf, _OldConfig) ->
+pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
+    %% to save the 'enable' to the config files
+    {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
+pre_config_update(_, Conf, _OldConfig) when is_map(Conf) ->
     {ok, Conf}.
 
 %% internal functions
-operation_to_enable(start) -> true;
-operation_to_enable(stop) -> false;
-operation_to_enable(restart) -> true.
-
-perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name);
-perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name);
-perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name).
+operation_to_enable(disable) -> false;
+operation_to_enable(enable) -> true.

+ 17 - 2
apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl

@@ -22,6 +22,8 @@
 
         , list_bridges/1
         , lookup_from_all_nodes/3
+        , restart_bridges_to_all_nodes/3
+        , stop_bridges_to_all_nodes/3
         ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -37,7 +39,20 @@ list_bridges(Node) ->
 
 -type key() :: atom() | binary() | [byte()].
 
+-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
+        emqx_rpc:erpc_multicall().
+restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(Nodes, emqx_bridge, restart,
+        [BridgeType, BridgeName], ?TIMEOUT).
+
+-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
+        emqx_rpc:erpc_multicall().
+stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(Nodes, emqx_bridge, stop,
+        [BridgeType, BridgeName], ?TIMEOUT).
+
 -spec lookup_from_all_nodes([node()], key(), key()) ->
-          emqx_rpc:erpc_multicall().
+        emqx_rpc:erpc_multicall().
 lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
-    erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT).
+    erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node,
+        [BridgeType, BridgeName], ?TIMEOUT).

+ 68 - 8
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -79,8 +79,14 @@ init_per_testcase(_, Config) ->
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     Config.
 end_per_testcase(_, _Config) ->
+    clear_resources(),
     ok.
 
+clear_resources() ->
+    lists:foreach(fun(#{type := Type, name := Name}) ->
+            ok = emqx_bridge:remove(Type, Name)
+        end, emqx_bridge:list()).
+
 %%------------------------------------------------------------------------------
 %% HTTP server for testing
 %%------------------------------------------------------------------------------
@@ -239,6 +245,11 @@ t_http_crud_apis(_) ->
     ok.
 
 t_start_stop_bridges(_) ->
+    lists:foreach(fun(Type) ->
+            do_start_stop_bridges(Type)
+        end, [node, cluster]).
+
+do_start_stop_bridges(Type) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
@@ -249,7 +260,7 @@ t_start_stop_bridges(_) ->
     %ct:pal("the bridge ==== ~p", [Bridge]),
     #{ <<"type">> := ?BRIDGE_TYPE
      , <<"name">> := ?BRIDGE_NAME
-     , <<"status">> := _
+     , <<"status">> := <<"connected">>
      , <<"node_status">> := [_|_]
      , <<"metrics">> := _
      , <<"node_metrics">> := [_|_]
@@ -257,24 +268,68 @@ t_start_stop_bridges(_) ->
      } = jsx:decode(Bridge),
     BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
     %% stop it
-    {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
+    {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
     {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{ <<"status">> := <<"disconnected">>
                   }, jsx:decode(Bridge2)),
     %% start again
-    {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>),
+    {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{ <<"status">> := <<"connected">>
                   }, jsx:decode(Bridge3)),
     %% restart an already started bridge
-    {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
+    {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{ <<"status">> := <<"connected">>
                   }, jsx:decode(Bridge3)),
     %% stop it again
-    {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
+    {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
     %% restart a stopped bridge
-    {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
+    {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
+    {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
+    ?assertMatch(#{ <<"status">> := <<"connected">>
+                  }, jsx:decode(Bridge4)),
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
+
+t_enable_disable_bridges(_) ->
+    %% assert we there's no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    Port = start_http_server(fun handle_fun_200_ok/2),
+    URL1 = ?URL(Port, "abc"),
+    {ok, 201, Bridge} = request(post, uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
+    %ct:pal("the bridge ==== ~p", [Bridge]),
+    #{ <<"type">> := ?BRIDGE_TYPE
+     , <<"name">> := ?BRIDGE_NAME
+     , <<"status">> := <<"connected">>
+     , <<"node_status">> := [_|_]
+     , <<"metrics">> := _
+     , <<"node_metrics">> := [_|_]
+     , <<"url">> := URL1
+     } = jsx:decode(Bridge),
+    BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    %% disable it
+    {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
+    {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
+    ?assertMatch(#{ <<"status">> := <<"disconnected">>
+                  }, jsx:decode(Bridge2)),
+    %% enable again
+    {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
+    {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
+    ?assertMatch(#{ <<"status">> := <<"connected">>
+                  }, jsx:decode(Bridge3)),
+    %% enable an already started bridge
+    {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
+    {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
+    ?assertMatch(#{ <<"status">> := <<"connected">>
+                  }, jsx:decode(Bridge3)),
+    %% disable it again
+    {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
+    %% enable a stopped bridge
+    {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
     {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{ <<"status">> := <<"connected">>
                   }, jsx:decode(Bridge4)),
@@ -307,7 +362,7 @@ request(Method, Url, Body) ->
 uri() -> uri([]).
 uri(Parts) when is_list(Parts) ->
     NParts = [E || E <- Parts],
-    ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
+    ?HOST ++ str(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
 
 auth_header_() ->
     Username = <<"bridge_admin">>,
@@ -315,5 +370,10 @@ auth_header_() ->
     {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
     {"Authorization", "Bearer " ++ binary_to_list(Token)}.
 
-operation_path(Oper, BridgeID) ->
+operation_path(node, Oper, BridgeID) ->
+    uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]);
+operation_path(cluster, Oper, BridgeID) ->
     uri(["bridges", BridgeID, "operation", Oper]).
+
+str(S) when is_list(S) -> S;
+str(S) when is_binary(S) -> binary_to_list(S).

+ 7 - 7
apps/emqx_connector/src/emqx_connector.erl

@@ -21,9 +21,9 @@
         , connector_id/2
         ]).
 
--export([ list/0
-        , lookup/1
-        , lookup/2
+-export([ list_raw/0
+        , lookup_raw/1
+        , lookup_raw/2
         , create_dry_run/2
         , update/2
         , update/3
@@ -68,18 +68,18 @@ parse_connector_id(ConnectorId) ->
         _ -> error({invalid_connector_id, ConnectorId})
     end.
 
-list() ->
+list_raw() ->
     lists:foldl(fun({Type, NameAndConf}, Connectors) ->
             lists:foldl(fun({Name, RawConf}, Acc) ->
                    [RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc]
                 end, Connectors, maps:to_list(NameAndConf))
         end, [], maps:to_list(emqx:get_raw_config(config_key_path(), #{}))).
 
-lookup(Id) when is_binary(Id) ->
+lookup_raw(Id) when is_binary(Id) ->
     {Type, Name} = parse_connector_id(Id),
-    lookup(Type, Name).
+    lookup_raw(Type, Name).
 
-lookup(Type, Name) ->
+lookup_raw(Type, Name) ->
     case emqx:get_raw_config(config_key_path() ++ [Type, Name], not_found) of
         not_found -> {error, not_found};
         Conf -> {ok, Conf#{<<"type">> => Type, <<"name">> => Name}}

+ 6 - 6
apps/emqx_connector/src/emqx_connector_api.erl

@@ -205,10 +205,10 @@ schema("/connectors/:id") ->
     end.
 
 '/connectors'(get, _Request) ->
-    {200, [format_resp(Conn) || Conn <- emqx_connector:list()]};
+    {200, [format_resp(Conn) || Conn <- emqx_connector:list_raw()]};
 
 '/connectors'(post, #{body := #{<<"type">> := ConnType, <<"name">> := ConnName} = Params}) ->
-    case emqx_connector:lookup(ConnType, ConnName) of
+    case emqx_connector:lookup_raw(ConnType, ConnName) of
         {ok, _} ->
             {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
         {error, not_found} ->
@@ -218,13 +218,13 @@ schema("/connectors/:id") ->
                     {201, format_resp(RawConf#{<<"type">> => ConnType,
                                                <<"name">> => ConnName})};
                 {error, Error} ->
-                    {400, error_msg('ALREADY_EXISTS', Error)}
+                    {400, error_msg('BAD_REQUEST', Error)}
             end
     end.
 
 '/connectors/:id'(get, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(Id,
-        case emqx_connector:lookup(ConnType, ConnName) of
+        case emqx_connector:lookup_raw(ConnType, ConnName) of
             {ok, Conf} ->
                 {200, format_resp(Conf)};
             {error, not_found} ->
@@ -234,7 +234,7 @@ schema("/connectors/:id") ->
 '/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
     Params = filter_out_request_body(Params0),
     ?TRY_PARSE_ID(Id,
-        case emqx_connector:lookup(ConnType, ConnName) of
+        case emqx_connector:lookup_raw(ConnType, ConnName) of
             {ok, _} ->
                 case emqx_connector:update(ConnType, ConnName, Params) of
                     {ok, #{raw_config := RawConf}} ->
@@ -249,7 +249,7 @@ schema("/connectors/:id") ->
 
 '/connectors/:id'(delete, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(Id,
-        case emqx_connector:lookup(ConnType, ConnName) of
+        case emqx_connector:lookup_raw(ConnType, ConnName) of
             {ok, _} ->
                 case emqx_connector:delete(ConnType, ConnName) of
                     {ok, _} ->

+ 2 - 11
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -114,15 +114,6 @@ set_special_configs(_) ->
 
 init_per_testcase(_, Config) ->
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    %% assert we there's no connectors and no bridges at first
-    {ok, 200, Connectors} = request(get, uri(["connectors"]), []),
-    lists:foreach(fun(#{<<"id">> := ConnectorID}) ->
-        {ok, 200, <<>>} = request(delete, uri(["connectors", ConnectorID]), [])
-                  end, jsx:decode(Connectors)),
-    {ok, 200, Bridges} = request(get, uri(["bridges"]), []),
-    lists:foreach(fun(#{<<"id">> := BridgeID}) ->
-        {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), [])
-              end,  jsx:decode(Bridges)),
     Config.
 end_per_testcase(_, _Config) ->
     clear_resources(),
@@ -135,9 +126,9 @@ clear_resources() ->
     lists:foreach(fun(#{type := Type, name := Name}) ->
             ok = emqx_bridge:remove(Type, Name)
         end, emqx_bridge:list()),
-    lists:foreach(fun(#{type := Type, name := Name}) ->
+    lists:foreach(fun(#{<<"type">> := Type, <<"name">> := Name}) ->
             ok = emqx_connector:delete(Type, Name)
-        end, emqx_connector:list()).
+        end, emqx_connector:list_raw()).
 
 %%------------------------------------------------------------------------------
 %% Testcases

+ 1 - 1
apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl

@@ -71,7 +71,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         ?CONNECTOR_RESOURCE_GROUP,
         ?MYSQL_RESOURCE_MOD,
         CheckedConfig,
-        #{waiting_connect_complete => 5000}
+        #{}
     ),
     ?assertEqual(InitialStatus, connected),
     % Instance should match the state and status of the just started resource

+ 1 - 1
apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl

@@ -72,7 +72,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         ?CONNECTOR_RESOURCE_GROUP,
         ?PGSQL_RESOURCE_MOD,
         CheckedConfig,
-        #{waiting_connect_complete => 5000}
+        #{}
     ),
     ?assertEqual(InitialStatus, connected),
     % Instance should match the state and status of the just started resource

+ 1 - 1
apps/emqx_connector/test/emqx_connector_redis_SUITE.erl

@@ -86,7 +86,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
         ?CONNECTOR_RESOURCE_GROUP,
         ?REDIS_RESOURCE_MOD,
         CheckedConfig,
-        #{waiting_connect_complete => 5000}
+        #{}
     ),
     ?assertEqual(InitialStatus, connected),
     % Instance should match the state and status of the just started resource

+ 2 - 0
apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl

@@ -157,6 +157,8 @@ init(Name) ->
     persistent_term:put(?CntrRef(Name), #{}),
     {ok, #state{}}.
 
+handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
+    {reply, #{}, State};
 handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
     {reply, case maps:get(Id, Rates, undefined) of
                 undefined -> #{};

+ 10 - 7
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -178,14 +178,17 @@ do_recreate(InstId, ResourceType, NewConfig, Opts) ->
             {error, not_found}
     end.
 
-wait_for_resource_ready(InstId, 0) ->
-    force_lookup(InstId);
-wait_for_resource_ready(InstId, Retry) ->
+wait_for_resource_ready(InstId, WaitTime) ->
+    do_wait_for_resource_ready(InstId, WaitTime div 100).
+
+do_wait_for_resource_ready(_InstId, 0) ->
+    timeout;
+do_wait_for_resource_ready(InstId, Retry) ->
     case force_lookup(InstId) of
-        #{status := connected} = Data -> Data;
+        #{status := connected} -> ok;
         _ ->
             timer:sleep(100),
-            wait_for_resource_ready(InstId, Retry-1)
+            do_wait_for_resource_ready(InstId, Retry-1)
     end.
 
 do_create(InstId, Group, ResourceType, Config, Opts) ->
@@ -197,8 +200,7 @@ do_create(InstId, Group, ResourceType, Config, Opts) ->
                 ok ->
                     ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
                             [matched, success, failed, exception], [matched]),
-                    WaitTime = maps:get(waiting_connect_complete , Opts, 0),
-                    {ok, wait_for_resource_ready(InstId, WaitTime div 100)};
+                    {ok, force_lookup(InstId)};
                 Error ->
                     Error
             end
@@ -252,6 +254,7 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
     spawn(fun() ->
             start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
         end),
+    _ = wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
     ok.
 
 start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.erl

@@ -361,7 +361,7 @@ create_resource(Context, #{type := DB} = Config) ->
            <<"emqx_retainer">>,
            list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
            Config,
-           #{waiting_connect_complete => 5000}) of
+           #{}) of
         {ok, already_created} ->
             Context#{resource_id => ResourceID};
         {ok, _} ->