Просмотр исходного кода

feat: allow to manually re-connect disconected bridge

Stefan Strigler 3 лет назад
Родитель
Сommit
86f3f5787f

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -3,6 +3,7 @@
 {emqx_authn,1}.
 {emqx_authz,1}.
 {emqx_bridge,1}.
+{emqx_bridge,2}.
 {emqx_broker,1}.
 {emqx_cm,1}.
 {emqx_conf,1}.

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.10"},
+    {vsn, "0.1.11"},
     {registered, []},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 33 - 8
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -413,8 +413,9 @@ schema("/bridges/:id/:operation") ->
             ],
             responses => #{
                 204 => <<"Operation success">>,
-                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable"),
-                400 => error_schema('INVALID_ID', "Bad bridge ID")
+                400 => error_schema('INVALID_ID', "Bad bridge ID"),
+                501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
+                503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
             }
         }
     };
@@ -434,6 +435,7 @@ schema("/nodes/:node/bridges/:id/:operation") ->
                 204 => <<"Operation success">>,
                 400 => error_schema('INVALID_ID', "Bad bridge ID"),
                 403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"),
+                501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
                 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
             }
         }
@@ -640,10 +642,12 @@ lookup_from_local_node(BridgeType, BridgeName) ->
         end
     ).
 
+node_operation_func(<<"start">>) -> start_bridge_to_node;
 node_operation_func(<<"stop">>) -> stop_bridge_to_node;
 node_operation_func(<<"restart">>) -> restart_bridge_to_node;
 node_operation_func(_) -> invalid.
 
+operation_func(<<"start">>) -> start;
 operation_func(<<"stop">>) -> stop;
 operation_func(<<"restart">>) -> restart;
 operation_func(_) -> invalid.
@@ -656,11 +660,14 @@ operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
     RpcFunc =
         case OperFunc of
             restart -> restart_bridges_to_all_nodes;
+            start -> start_bridges_to_all_nodes;
             stop -> stop_bridges_to_all_nodes
         end,
-    case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
+    case is_ok(do_bpapi_call(RpcFunc, [Nodes, BridgeType, BridgeName])) of
         {ok, _} ->
             {204};
+        {error, not_implemented} ->
+            {501};
         {error, [timeout | _]} ->
             {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
         {error, ErrL} ->
@@ -858,6 +865,8 @@ unpack_bridge_conf(Type, PackedConf) ->
     #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges),
     RawConf.
 
+is_ok(Error = {error, _}) ->
+    Error;
 is_ok(ResL) ->
     case
         lists:filter(
@@ -899,13 +908,11 @@ bin(S) when is_binary(S) ->
 call_operation(Node, OperFunc, BridgeType, BridgeName) ->
     case emqx_misc:safe_to_existing_atom(Node, utf8) of
         {ok, TargetNode} ->
-            case
-                emqx_bridge_proto_v1:OperFunc(
-                    TargetNode, BridgeType, BridgeName
-                )
-            of
+            case do_bpapi_call(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]) of
                 ok ->
                     {204};
+                {error, not_implemented} ->
+                    {501};
                 {error, timeout} ->
                     {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
                 {error, {start_pool_failed, Name, Reason}} ->
@@ -926,6 +933,24 @@ call_operation(Node, OperFunc, BridgeType, BridgeName) ->
             {400, error_msg('INVALID_NODE', <<"invalid node">>)}
     end.
 
+do_bpapi_call(Call, Args) ->
+    do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args).
+
+do_bpapi_call(Node, Call, Args) ->
+    do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args).
+
+do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
+    case lists:member(SupportedVersion, supported_versions(Call)) of
+        true ->
+            apply(emqx_bridge_proto_v2, Call, Args);
+        false ->
+            {error, not_implemented}
+    end.
+
+supported_versions(start_bridge_to_node) -> [2];
+supported_versions(start_bridges_to_all_nodes) -> [2];
+supported_versions(_Call) -> [1, 2].
+
 redact(Term) ->
     emqx_misc:redact(Term).
 

+ 5 - 1
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 -module(emqx_bridge_resource).
--include_lib("emqx/include/emqx.hrl").
+
 -include_lib("emqx/include/logger.hrl").
 
 -export([
@@ -38,6 +38,7 @@
     update/2,
     update/3,
     update/4,
+    start/2,
     stop/2,
     restart/2,
     reset_metrics/1
@@ -121,6 +122,9 @@ reset_metrics(ResourceId) ->
 stop(Type, Name) ->
     emqx_resource:stop(resource_id(Type, Name)).
 
+start(Type, Name) ->
+    emqx_resource:start(resource_id(Type, Name)).
+
 %% we don't provide 'start', as we want an already started bridge to be restarted.
 restart(Type, Name) ->
     emqx_resource:restart(resource_id(Type, Name)).

+ 4 - 0
apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl

@@ -20,6 +20,7 @@
 
 -export([
     introduced_in/0,
+    deprecated_since/0,
 
     list_bridges/1,
     restart_bridge_to_node/3,
@@ -36,6 +37,9 @@
 introduced_in() ->
     "5.0.0".
 
+deprecated_since() ->
+    "5.0.17".
+
 -spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
 list_bridges(Node) ->
     rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).

+ 122 - 0
apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl

@@ -0,0 +1,122 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    list_bridges/1,
+    restart_bridge_to_node/3,
+    start_bridge_to_node/3,
+    stop_bridge_to_node/3,
+    lookup_from_all_nodes/3,
+    restart_bridges_to_all_nodes/3,
+    start_bridges_to_all_nodes/3,
+    stop_bridges_to_all_nodes/3
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.0.17".
+
+-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
+list_bridges(Node) ->
+    rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
+
+-type key() :: atom() | binary() | [byte()].
+
+-spec restart_bridge_to_node(node(), key(), key()) ->
+    term().
+restart_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        restart,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec start_bridge_to_node(node(), key(), key()) ->
+    term().
+start_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        start,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec stop_bridge_to_node(node(), key(), key()) ->
+    term().
+stop_bridge_to_node(Node, BridgeType, BridgeName) ->
+    rpc:call(
+        Node,
+        emqx_bridge_resource,
+        stop,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        restart,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec start_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        start,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_resource,
+        stop,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).
+
+-spec lookup_from_all_nodes([node()], key(), key()) ->
+    emqx_rpc:erpc_multicall().
+lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(
+        Nodes,
+        emqx_bridge_api,
+        lookup_from_local_node,
+        [BridgeType, BridgeName],
+        ?TIMEOUT
+    ).

+ 33 - 7
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -82,11 +82,19 @@ end_per_suite(_Config) ->
     emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]),
     ok.
 
+init_per_testcase(t_bad_bpapi_vsn, Config) ->
+    meck:new(emqx_bpapi, [passthrough]),
+    meck:expect(emqx_bpapi, supported_version, 1, 1),
+    meck:expect(emqx_bpapi, supported_version, 2, 1),
+    init_per_testcase(commong, Config);
 init_per_testcase(_, Config) ->
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
     {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
     [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
 
+end_per_testcase(t_bad_bpapi_vsn, Config) ->
+    meck:unload([emqx_bpapi]),
+    end_per_testcase(commong, Config);
 end_per_testcase(_, Config) ->
     Sock = ?config(sock, Config),
     Acceptor = ?config(acceptor, Config),
@@ -440,6 +448,20 @@ t_start_stop_bridges_node(Config) ->
 t_start_stop_bridges_cluster(Config) ->
     do_start_stop_bridges(cluster, Config).
 
+t_bad_bpapi_vsn(Config) ->
+    Port = ?config(port, Config),
+    URL1 = ?URL(Port, "abc"),
+    Name = <<"t_bad_bpapi_vsn">>,
+    {ok, 201, _Bridge} = request(
+        post,
+        uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
+    ),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
+    {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
+    {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
+    ok.
+
 do_start_stop_bridges(Type, Config) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@@ -463,21 +485,25 @@ do_start_stop_bridges(Type, Config) ->
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% stop it
-    {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
     {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
     %% start again
-    {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
+    %% start a started bridge
+    {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
+    {ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []),
+    ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)),
     %% restart an already started bridge
-    {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
     %% stop it again
-    {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
     %% restart a stopped bridge
-    {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
+    {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
     {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
     ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)),
     %% delete the bridge
@@ -536,7 +562,7 @@ t_enable_disable_bridges(Config) ->
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
 
 t_reset_bridges(Config) ->
-    %% assert we there's no bridges at first
+    %% assert there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
     Name = ?BRIDGE_NAME,
@@ -557,7 +583,7 @@ t_reset_bridges(Config) ->
         <<"url">> := URL1
     } = jsx:decode(Bridge),
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
-    {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
+    {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
 
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 5 - 3
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -17,7 +17,6 @@
 -behaviour(gen_statem).
 
 -include("emqx_resource.hrl").
--include("emqx_resource_utils.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 % API
@@ -327,8 +326,11 @@ handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
 handle_event({call, From}, restart, _State, Data) ->
     _ = stop_resource(Data),
     start_resource(Data, From);
-% Called when the resource is to be started
-handle_event({call, From}, start, stopped, Data) ->
+% Called when the resource is to be started (also used for manual reconnect)
+handle_event({call, From}, start, State, Data) when
+    State =:= stopped orelse
+        State =:= disconnected
+->
     start_resource(Data, From);
 handle_event({call, From}, start, _State, _Data) ->
     {keep_state_and_data, [{reply, From, ok}]};

+ 1 - 0
changes/v5.0.17/feat-9910.en.md

@@ -0,0 +1 @@
+Add `start` operation to bridges API to allow manual reconnect after failure.

+ 1 - 0
changes/v5.0.17/feat-9910.zh.md

@@ -0,0 +1 @@
+在桥梁 API 中增加 `start` "操作,允许失败后手动重新连接。