소스 검색

Merge pull request #11869 from kjellwinblad/kjell/shared_con/start_feedback/EMQX-11304

fix(bridge_v2): start operation should return an error when unsuccessful
Kjell Winblad 2 년 전
부모
커밋
65ba0b9de4

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -1078,7 +1078,7 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
             ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>);
         {error, {unhealthy_target, Message}} ->
             ?BAD_REQUEST(Message);
-        {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
+        {error, Reason} ->
             ?BAD_REQUEST(redact(Reason))
     end.
 

+ 50 - 13
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -434,39 +434,71 @@ disable_enable(Action, BridgeType, BridgeName) when
 
 %% Manually start connector. This function can speed up reconnection when
 %% waiting for auto reconnection. The function forwards the start request to
-%% its connector.
+%% its connector. Returns ok if the status of the bridge is connected after
+%% starting the connector. Returns {error, Reason} if the status of the bridge
+%% is something else than connected after starting the connector or if an
+%% error occurred when the connector was started.
+-spec start(term(), term()) -> ok | {error, Reason :: term()}.
 start(BridgeV2Type, Name) ->
     ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
         emqx_connector_resource:start(ConnectorType, ConnectorName)
     end,
-    connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun).
+    connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, true).
 
-connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun) ->
+connector_operation_helper(BridgeV2Type, Name, ConnectorOpFun, DoHealthCheck) ->
     connector_operation_helper_with_conf(
         BridgeV2Type,
+        Name,
         lookup_conf(BridgeV2Type, Name),
-        ConnectorOpFun
+        ConnectorOpFun,
+        DoHealthCheck
     ).
 
 connector_operation_helper_with_conf(
     _BridgeV2Type,
+    _Name,
     {error, bridge_not_found} = Error,
-    _ConnectorOpFun
+    _ConnectorOpFun,
+    _DoHealthCheck
 ) ->
     Error;
 connector_operation_helper_with_conf(
     _BridgeV2Type,
+    _Name,
     #{enable := false},
-    _ConnectorOpFun
+    _ConnectorOpFun,
+    _DoHealthCheck
 ) ->
     ok;
 connector_operation_helper_with_conf(
     BridgeV2Type,
+    Name,
     #{connector := ConnectorName},
-    ConnectorOpFun
+    ConnectorOpFun,
+    DoHealthCheck
 ) ->
     ConnectorType = connector_type(BridgeV2Type),
-    ConnectorOpFun(ConnectorType, ConnectorName).
+    ConnectorOpFunResult = ConnectorOpFun(ConnectorType, ConnectorName),
+    case {DoHealthCheck, ConnectorOpFunResult} of
+        {false, _} ->
+            ConnectorOpFunResult;
+        {true, {error, Reason}} ->
+            {error, Reason};
+        {true, ok} ->
+            case health_check(BridgeV2Type, Name) of
+                #{status := connected} ->
+                    ok;
+                {error, Reason} ->
+                    {error, Reason};
+                #{status := Status, error := Reason} ->
+                    Msg = io_lib:format(
+                        "Connector started but bridge (~s:~s) is not connected. "
+                        "Bridge Status: ~p, Error: ~p",
+                        [bin(BridgeV2Type), bin(Name), Status, Reason]
+                    ),
+                    {error, iolist_to_binary(Msg)}
+            end
+    end.
 
 reset_metrics(Type, Name) ->
     reset_metrics_helper(Type, Name, lookup_conf(Type, Name)).
@@ -513,6 +545,9 @@ do_send_msg_with_enabled_config(
     BridgeV2Id = id(BridgeType, BridgeName),
     emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
 
+-spec health_check(BridgeType :: term(), BridgeName :: term()) ->
+    #{status := term(), error := term()} | {error, Reason :: term()}.
+
 health_check(BridgeType, BridgeName) ->
     case lookup_conf(BridgeType, BridgeName) of
         #{
@@ -1365,28 +1400,30 @@ bridge_v1_restart(BridgeV1Type, Name) ->
     ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
         emqx_connector_resource:restart(ConnectorType, ConnectorName)
     end,
-    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
+    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true).
 
 bridge_v1_stop(BridgeV1Type, Name) ->
     ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
         emqx_connector_resource:stop(ConnectorType, ConnectorName)
     end,
-    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
+    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, false).
 
 bridge_v1_start(BridgeV1Type, Name) ->
     ConnectorOpFun = fun(ConnectorType, ConnectorName) ->
         emqx_connector_resource:start(ConnectorType, ConnectorName)
     end,
-    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun).
+    bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, true).
 
-bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun) ->
+bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) ->
     BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
     case emqx_bridge_v2:is_valid_bridge_v1(BridgeV1Type, Name) of
         true ->
             connector_operation_helper_with_conf(
                 BridgeV2Type,
+                Name,
                 lookup_conf(BridgeV2Type, Name),
-                ConnectorOpFun
+                ConnectorOpFun,
+                DoHealthCheck
             );
         false ->
             {error, not_bridge_v1_compatible}

+ 1 - 3
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -606,9 +606,7 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
             ?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>);
         {error, {node_not_found, Node}} ->
             ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>);
-        {error, {unhealthy_target, Message}} ->
-            ?BAD_REQUEST(Message);
-        {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
+        {error, Reason} ->
             ?BAD_REQUEST(redact(Reason))
     end.
 

+ 38 - 0
apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

@@ -806,6 +806,44 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
     ),
     ok.
 
+t_start_operation_when_on_add_channel_gives_error(_Config) ->
+    Conf = bridge_config(),
+    BridgeName = my_test_bridge,
+    emqx_common_test_helpers:with_mock(
+        emqx_bridge_v2_test_connector,
+        on_add_channel,
+        fun(_, _, _ResId, _Channel) -> {error, <<"some_error">>} end,
+        fun() ->
+            %% We can crete the bridge event though on_add_channel returns error
+            ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), BridgeName, Conf)),
+            ?assertMatch(
+                #{
+                    status := disconnected,
+                    error := <<"some_error">>
+                },
+                emqx_bridge_v2:health_check(bridge_type(), BridgeName)
+            ),
+            ?assertMatch(
+                {ok, #{
+                    status := disconnected,
+                    error := <<"some_error">>
+                }},
+                emqx_bridge_v2:lookup(bridge_type(), BridgeName)
+            ),
+            %% emqx_bridge_v2:start/2 should return ok if bridge if connected after
+            %% start and otherwise and error
+            ?assertMatch({error, _}, emqx_bridge_v2:start(bridge_type(), BridgeName)),
+            %% Let us change on_add_channel to be successful and try again
+            ok = meck:expect(
+                emqx_bridge_v2_test_connector,
+                on_add_channel,
+                fun(_, _, _ResId, _Channel) -> {ok, #{}} end
+            ),
+            ?assertMatch(ok, emqx_bridge_v2:start(bridge_type(), BridgeName))
+        end
+    ),
+    ok.
+
 %% Helper Functions
 
 wait_until(Fun) ->

+ 42 - 0
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -147,6 +147,7 @@
     emqx,
     emqx_auth,
     emqx_management,
+    emqx_connector,
     {emqx_bridge, "bridges_v2 {}"},
     {emqx_rule_engine, "rule_engine { rules {} }"}
 ]).
@@ -280,6 +281,9 @@ init_mocks() ->
     meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
     meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}),
     meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected),
+    ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
+        emqx_bridge_v2:get_channels_for_connector(ResId)
+    end),
     [?CONNECTOR_IMPL, emqx_connector_ee_schema].
 
 clear_resources() ->
@@ -504,6 +508,35 @@ do_start_bridge(TestType, Config) ->
 
     {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config),
 
+    %% Make start bridge fail
+    expect_on_all_nodes(
+        ?CONNECTOR_IMPL,
+        on_add_channel,
+        fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end,
+        Config
+    ),
+    ConnectorID = emqx_connector_resource:connector_id(?BRIDGE_TYPE, ?CONNECTOR_NAME),
+    {ok, 204, <<>>} = emqx_connector_api_SUITE:request(
+        post, {operation, TestType, stop, ConnectorID}, Config
+    ),
+    {ok, 204, <<>>} = emqx_connector_api_SUITE:request(
+        post, {operation, TestType, start, ConnectorID}, Config
+    ),
+
+    {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config),
+
+    %% Make start bridge succeed
+
+    expect_on_all_nodes(
+        ?CONNECTOR_IMPL,
+        on_add_channel,
+        fun(_, _, _ResId, _Channel) -> {ok, connector_state} end,
+        Config
+    ),
+
+    %% try to start again
+    {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config),
+
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config),
     {ok, 200, []} = request_json(get, uri([?ROOT]), Config),
@@ -514,6 +547,15 @@ do_start_bridge(TestType, Config) ->
     {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config),
     ok.
 
+expect_on_all_nodes(Mod, Function, Fun, Config) ->
+    case ?config(cluster_nodes, Config) of
+        undefined ->
+            ok = meck:expect(Mod, Function, Fun);
+        Nodes ->
+            [erpc:call(Node, meck, expect, [Mod, Function, Fun]) || Node <- Nodes]
+    end,
+    ok.
+
 %% t_start_stop_inconsistent_bridge_node(Config) ->
 %%     start_stop_inconsistent_bridge(node, Config).