|
|
@@ -396,13 +396,13 @@ t_start_connector_unknown_node(Config) ->
|
|
|
Config
|
|
|
).
|
|
|
|
|
|
-t_start_stop_connectors_node(Config) ->
|
|
|
- do_start_stop_connectors(node, Config).
|
|
|
+t_start_connector_node(Config) ->
|
|
|
+ do_start_connector(node, Config).
|
|
|
|
|
|
-t_start_stop_connectors_cluster(Config) ->
|
|
|
- do_start_stop_connectors(cluster, Config).
|
|
|
+t_start_connector_cluster(Config) ->
|
|
|
+ do_start_connector(cluster, Config).
|
|
|
|
|
|
-do_start_stop_connectors(TestType, Config) ->
|
|
|
+do_start_connector(TestType, Config) ->
|
|
|
%% assert we there's no connectors at first
|
|
|
{ok, 200, []} = request_json(get, uri(["connectors"]), Config),
|
|
|
|
|
|
@@ -424,6 +424,14 @@ do_start_stop_connectors(TestType, Config) ->
|
|
|
),
|
|
|
|
|
|
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
|
|
|
+
|
|
|
+ %% Starting a healthy connector shouldn't do any harm
|
|
|
+ {ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config),
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{<<"status">> := <<"connected">>}},
|
|
|
+ request_json(get, uri(["connectors", ConnectorID]), Config)
|
|
|
+ ),
|
|
|
+
|
|
|
ExpectedStatus =
|
|
|
case ?config(group, Config) of
|
|
|
cluster when TestType == node ->
|
|
|
@@ -433,7 +441,23 @@ do_start_stop_connectors(TestType, Config) ->
|
|
|
end,
|
|
|
|
|
|
%% stop it
|
|
|
- {ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config),
|
|
|
+ case ?config(group, Config) of
|
|
|
+ cluster ->
|
|
|
+ case TestType of
|
|
|
+ node ->
|
|
|
+ Node = ?config(node, Config),
|
|
|
+ ok = rpc:call(
|
|
|
+ Node, emqx_connector_resource, stop, [?CONNECTOR_TYPE, Name], 500
|
|
|
+ );
|
|
|
+ cluster ->
|
|
|
+ Nodes = ?config(cluster_nodes, Config),
|
|
|
+ [{ok, ok}, {ok, ok}] = erpc:multicall(
|
|
|
+ Nodes, emqx_connector_resource, stop, [?CONNECTOR_TYPE, Name], 500
|
|
|
+ )
|
|
|
+ end;
|
|
|
+ _ ->
|
|
|
+ ok = emqx_connector_resource:stop(?CONNECTOR_TYPE, Name)
|
|
|
+ end,
|
|
|
?assertMatch(
|
|
|
{ok, 200, #{<<"status">> := ExpectedStatus}},
|
|
|
request_json(get, uri(["connectors", ConnectorID]), Config)
|
|
|
@@ -444,27 +468,8 @@ do_start_stop_connectors(TestType, Config) ->
|
|
|
{ok, 200, #{<<"status">> := <<"connected">>}},
|
|
|
request_json(get, uri(["connectors", ConnectorID]), Config)
|
|
|
),
|
|
|
- %% start a started connector
|
|
|
- {ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config),
|
|
|
- ?assertMatch(
|
|
|
- {ok, 200, #{<<"status">> := <<"connected">>}},
|
|
|
- request_json(get, uri(["connectors", ConnectorID]), Config)
|
|
|
- ),
|
|
|
- %% restart an already started connector
|
|
|
- {ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config),
|
|
|
- ?assertMatch(
|
|
|
- {ok, 200, #{<<"status">> := <<"connected">>}},
|
|
|
- request_json(get, uri(["connectors", ConnectorID]), Config)
|
|
|
- ),
|
|
|
- %% stop it again
|
|
|
- {ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config),
|
|
|
- %% restart a stopped connector
|
|
|
- {ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config),
|
|
|
- ?assertMatch(
|
|
|
- {ok, 200, #{<<"status">> := <<"connected">>}},
|
|
|
- request_json(get, uri(["connectors", ConnectorID]), Config)
|
|
|
- ),
|
|
|
|
|
|
+ %% test invalid op
|
|
|
{ok, 400, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config),
|
|
|
|
|
|
%% delete the connector
|
|
|
@@ -506,43 +511,6 @@ do_start_stop_connectors(TestType, Config) ->
|
|
|
ok = gen_tcp:close(Sock),
|
|
|
ok.
|
|
|
|
|
|
-t_start_stop_inconsistent_connector_node(Config) ->
|
|
|
- start_stop_inconsistent_connector(node, Config).
|
|
|
-
|
|
|
-t_start_stop_inconsistent_connector_cluster(Config) ->
|
|
|
- start_stop_inconsistent_connector(cluster, Config).
|
|
|
-
|
|
|
-start_stop_inconsistent_connector(Type, Config) ->
|
|
|
- Node = ?config(node, Config),
|
|
|
-
|
|
|
- erpc:call(Node, fun() ->
|
|
|
- meck:new(emqx_connector_resource, [passthrough, no_link]),
|
|
|
- meck:expect(
|
|
|
- emqx_connector_resource,
|
|
|
- stop,
|
|
|
- fun
|
|
|
- (_, <<"connector_not_found">>) -> {error, not_found};
|
|
|
- (ConnectorType, Name) -> meck:passthrough([ConnectorType, Name])
|
|
|
- end
|
|
|
- )
|
|
|
- end),
|
|
|
-
|
|
|
- emqx_common_test_helpers:on_exit(fun() ->
|
|
|
- erpc:call(Node, fun() ->
|
|
|
- meck:unload([emqx_connector_resource])
|
|
|
- end)
|
|
|
- end),
|
|
|
-
|
|
|
- {ok, 201, _Connector} = request(
|
|
|
- post,
|
|
|
- uri(["connectors"]),
|
|
|
- ?KAFKA_CONNECTOR(<<"connector_not_found">>),
|
|
|
- Config
|
|
|
- ),
|
|
|
- {ok, 503, _} = request(
|
|
|
- post, {operation, Type, stop, <<"kafka_producer:connector_not_found">>}, Config
|
|
|
- ).
|
|
|
-
|
|
|
t_enable_disable_connectors(Config) ->
|
|
|
%% assert we there's no connectors at first
|
|
|
{ok, 200, []} = request_json(get, uri(["connectors"]), Config),
|