|
|
@@ -177,7 +177,9 @@ all() ->
|
|
|
groups() ->
|
|
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
|
|
SingleOnlyTests = [
|
|
|
- t_bridges_probe
|
|
|
+ t_bridges_probe,
|
|
|
+ t_broken_bridge_config,
|
|
|
+ t_fix_broken_bridge_config
|
|
|
],
|
|
|
ClusterLaterJoinOnlyTCs = [
|
|
|
% t_cluster_later_join_metrics
|
|
|
@@ -551,6 +553,117 @@ t_bridges_lifecycle(Config) ->
|
|
|
{ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config),
|
|
|
ok.
|
|
|
|
|
|
+t_broken_bridge_config(Config) ->
|
|
|
+ emqx_cth_suite:stop_apps([emqx_bridge]),
|
|
|
+ BridgeName = ?BRIDGE_NAME,
|
|
|
+ StartOps =
|
|
|
+ #{
|
|
|
+ config =>
|
|
|
+ "actions {\n"
|
|
|
+ " "
|
|
|
+ ?BRIDGE_TYPE_STR
|
|
|
+ " {\n"
|
|
|
+ " " ++ binary_to_list(BridgeName) ++
|
|
|
+ " {\n"
|
|
|
+ " connector = does_not_exist\n"
|
|
|
+ " enable = true\n"
|
|
|
+ " kafka {\n"
|
|
|
+ " topic = test-topic-one-partition\n"
|
|
|
+ " }\n"
|
|
|
+ " local_topic = \"mqtt/local/topic\"\n"
|
|
|
+ " resource_opts {health_check_interval = 32s}\n"
|
|
|
+ " }\n"
|
|
|
+ " }\n"
|
|
|
+ "}\n"
|
|
|
+ "\n",
|
|
|
+ schema_mod => emqx_bridge_v2_schema
|
|
|
+ },
|
|
|
+ emqx_cth_suite:start_app(emqx_bridge, StartOps),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, [
|
|
|
+ #{
|
|
|
+ <<"name">> := BridgeName,
|
|
|
+ <<"type">> := ?BRIDGE_TYPE,
|
|
|
+ <<"connector">> := <<"does_not_exist">>,
|
|
|
+ <<"status">> := <<"disconnected">>,
|
|
|
+ <<"error">> := <<"Pending installation">>
|
|
|
+ }
|
|
|
+ ]},
|
|
|
+ request_json(get, uri([?ROOT]), Config)
|
|
|
+ ),
|
|
|
+
|
|
|
+ BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
|
|
|
+ ?assertEqual(
|
|
|
+ {ok, 204, <<>>},
|
|
|
+ request(delete, uri([?ROOT, BridgeID]), Config)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertEqual(
|
|
|
+ {ok, 200, []},
|
|
|
+ request_json(get, uri([?ROOT]), Config)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_fix_broken_bridge_config(Config) ->
|
|
|
+ emqx_cth_suite:stop_apps([emqx_bridge]),
|
|
|
+ BridgeName = ?BRIDGE_NAME,
|
|
|
+ StartOps =
|
|
|
+ #{
|
|
|
+ config =>
|
|
|
+ "actions {\n"
|
|
|
+ " "
|
|
|
+ ?BRIDGE_TYPE_STR
|
|
|
+ " {\n"
|
|
|
+ " " ++ binary_to_list(BridgeName) ++
|
|
|
+ " {\n"
|
|
|
+ " connector = does_not_exist\n"
|
|
|
+ " enable = true\n"
|
|
|
+ " kafka {\n"
|
|
|
+ " topic = test-topic-one-partition\n"
|
|
|
+ " }\n"
|
|
|
+ " local_topic = \"mqtt/local/topic\"\n"
|
|
|
+ " resource_opts {health_check_interval = 32s}\n"
|
|
|
+ " }\n"
|
|
|
+ " }\n"
|
|
|
+ "}\n"
|
|
|
+ "\n",
|
|
|
+ schema_mod => emqx_bridge_v2_schema
|
|
|
+ },
|
|
|
+ emqx_cth_suite:start_app(emqx_bridge, StartOps),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, [
|
|
|
+ #{
|
|
|
+ <<"name">> := BridgeName,
|
|
|
+ <<"type">> := ?BRIDGE_TYPE,
|
|
|
+ <<"connector">> := <<"does_not_exist">>,
|
|
|
+ <<"status">> := <<"disconnected">>,
|
|
|
+ <<"error">> := <<"Pending installation">>
|
|
|
+ }
|
|
|
+ ]},
|
|
|
+ request_json(get, uri([?ROOT]), Config)
|
|
|
+ ),
|
|
|
+
|
|
|
+ BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
|
|
|
+ request_json(
|
|
|
+ put,
|
|
|
+ uri([?ROOT, BridgeID]),
|
|
|
+ ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?CONNECTOR_NAME),
|
|
|
+ Config
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{
|
|
|
+ <<"connector">> := ?CONNECTOR_NAME,
|
|
|
+ <<"status">> := <<"connected">>
|
|
|
+ }},
|
|
|
+ request_json(get, uri([?ROOT, BridgeID]), Config)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ok.
|
|
|
+
|
|
|
t_start_bridge_unknown_node(Config) ->
|
|
|
{ok, 404, _} =
|
|
|
request(
|