|
|
@@ -56,6 +56,7 @@
|
|
|
-define(CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)).
|
|
|
-define(CONNECTOR, ?CONNECTOR(?CONNECTOR_NAME)).
|
|
|
|
|
|
+-define(MQTT_LOCAL_TOPIC, <<"mqtt/local/topic">>).
|
|
|
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
|
|
|
-define(BRIDGE_TYPE_STR, "kafka_producer").
|
|
|
-define(BRIDGE_TYPE, <<?BRIDGE_TYPE_STR>>).
|
|
|
@@ -93,7 +94,7 @@
|
|
|
<<"required_acks">> => <<"all_isr">>,
|
|
|
<<"topic">> => <<"kafka-topic">>
|
|
|
},
|
|
|
- <<"local_topic">> => <<"mqtt/local/topic">>,
|
|
|
+ <<"local_topic">> => ?MQTT_LOCAL_TOPIC,
|
|
|
<<"resource_opts">> => #{
|
|
|
<<"health_check_interval">> => <<"32s">>
|
|
|
}
|
|
|
@@ -105,48 +106,6 @@
|
|
|
).
|
|
|
-define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?CONNECTOR_NAME)).
|
|
|
|
|
|
-%% -define(BRIDGE_TYPE_MQTT, <<"mqtt">>).
|
|
|
-%% -define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{
|
|
|
-%% <<"server">> => SERVER,
|
|
|
-%% <<"username">> => <<"user1">>,
|
|
|
-%% <<"password">> => <<"">>,
|
|
|
-%% <<"proto_ver">> => <<"v5">>,
|
|
|
-%% <<"egress">> => #{
|
|
|
-%% <<"remote">> => #{
|
|
|
-%% <<"topic">> => <<"emqx/${topic}">>,
|
|
|
-%% <<"qos">> => <<"${qos}">>,
|
|
|
-%% <<"retain">> => false
|
|
|
-%% }
|
|
|
-%% }
|
|
|
-%% }).
|
|
|
-%% -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
|
|
|
-
|
|
|
-%% -define(BRIDGE_TYPE_HTTP, <<"kafka">>).
|
|
|
-%% -define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{
|
|
|
-%% <<"url">> => URL,
|
|
|
-%% <<"local_topic">> => <<"emqx_webhook/#">>,
|
|
|
-%% <<"method">> => <<"post">>,
|
|
|
-%% <<"body">> => <<"${payload}">>,
|
|
|
-%% <<"headers">> => #{
|
|
|
-%% % NOTE
|
|
|
-%% % The Pascal-Case is important here.
|
|
|
-%% % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts
|
|
|
-%% % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS
|
|
|
-%% % when this happens (while the 'content-type' does not).
|
|
|
-%% <<"Content-Type">> => <<"application/json">>
|
|
|
-%% }
|
|
|
-%% }).
|
|
|
-%% -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)).
|
|
|
-
|
|
|
-%% -define(URL(PORT, PATH),
|
|
|
-%% list_to_binary(
|
|
|
-%% io_lib:format(
|
|
|
-%% "http://localhost:~s/~s",
|
|
|
-%% [integer_to_list(PORT), PATH]
|
|
|
-%% )
|
|
|
-%% )
|
|
|
-%% ).
|
|
|
-
|
|
|
-define(APPSPECS, [
|
|
|
emqx_conf,
|
|
|
emqx,
|
|
|
@@ -166,7 +125,7 @@
|
|
|
all() ->
|
|
|
[
|
|
|
{group, single},
|
|
|
- %{group, cluster_later_join},
|
|
|
+ {group, cluster_later_join},
|
|
|
{group, cluster}
|
|
|
].
|
|
|
-else.
|
|
|
@@ -182,7 +141,7 @@ groups() ->
|
|
|
t_fix_broken_bridge_config
|
|
|
],
|
|
|
ClusterLaterJoinOnlyTCs = [
|
|
|
- % t_cluster_later_join_metrics
|
|
|
+ t_cluster_later_join_metrics
|
|
|
],
|
|
|
[
|
|
|
{single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
|
|
|
@@ -202,9 +161,9 @@ end_per_suite(_Config) ->
|
|
|
init_per_group(cluster = Name, Config) ->
|
|
|
Nodes = [NodePrimary | _] = mk_cluster(Name, Config),
|
|
|
init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
|
|
|
-%% init_per_group(cluster_later_join = Name, Config) ->
|
|
|
-%% Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
|
|
|
-%% init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
|
|
|
+init_per_group(cluster_later_join = Name, Config) ->
|
|
|
+ Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}),
|
|
|
+ init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]);
|
|
|
init_per_group(Name, Config) ->
|
|
|
WorkDir = filename:join(?config(priv_dir, Config), Name),
|
|
|
Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
|
|
|
@@ -1041,6 +1000,143 @@ t_bad_name(Config) ->
|
|
|
),
|
|
|
ok.
|
|
|
|
|
|
+t_metrics(Config) ->
|
|
|
+ {ok, 200, []} = request_json(get, uri([?ROOT]), Config),
|
|
|
+
|
|
|
+ ActionName = ?BRIDGE_NAME,
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 201, _},
|
|
|
+ request_json(
|
|
|
+ post,
|
|
|
+ uri([?ROOT]),
|
|
|
+ ?KAFKA_BRIDGE(?BRIDGE_NAME),
|
|
|
+ Config
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{
|
|
|
+ <<"metrics">> := #{<<"matched">> := 0},
|
|
|
+ <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 0}} | _]
|
|
|
+ }},
|
|
|
+ request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
|
|
|
+ ),
|
|
|
+
|
|
|
+ {ok, 200, Bridge} = request_json(get, uri([?ROOT, ActionID]), Config),
|
|
|
+ ?assertNot(maps:is_key(<<"metrics">>, Bridge)),
|
|
|
+ ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)),
|
|
|
+
|
|
|
+ Body = <<"my msg">>,
|
|
|
+ _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config),
|
|
|
+
|
|
|
+ %% check for non-empty bridge metrics
|
|
|
+ ?retry(
|
|
|
+ _Sleep0 = 200,
|
|
|
+ _Retries0 = 20,
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{
|
|
|
+ <<"metrics">> := #{<<"matched">> := 1},
|
|
|
+ <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 1}} | _]
|
|
|
+ }},
|
|
|
+ request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% check for absence of metrics when listing all bridges
|
|
|
+ {ok, 200, Bridges} = request_json(get, uri([?ROOT]), Config),
|
|
|
+ ?assertNotMatch(
|
|
|
+ [
|
|
|
+ #{
|
|
|
+ <<"metrics">> := #{},
|
|
|
+ <<"node_metrics">> := [_ | _]
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ Bridges
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_reset_metrics(Config) ->
|
|
|
+ %% assert there's no bridges at first
|
|
|
+ {ok, 200, []} = request_json(get, uri([?ROOT]), Config),
|
|
|
+
|
|
|
+ ActionName = ?BRIDGE_NAME,
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 201, _},
|
|
|
+ request_json(
|
|
|
+ post,
|
|
|
+ uri([?ROOT]),
|
|
|
+ ?KAFKA_BRIDGE(?BRIDGE_NAME),
|
|
|
+ Config
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName),
|
|
|
+
|
|
|
+ Body = <<"my msg">>,
|
|
|
+ _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config),
|
|
|
+ ?retry(
|
|
|
+ _Sleep0 = 200,
|
|
|
+ _Retries0 = 20,
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{
|
|
|
+ <<"metrics">> := #{<<"matched">> := 1},
|
|
|
+ <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
|
|
|
+ }},
|
|
|
+ request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ {ok, 204, <<>>} = request(put, uri([?ROOT, ActionID, "metrics", "reset"]), Config),
|
|
|
+
|
|
|
+ ?retry(
|
|
|
+ _Sleep0 = 200,
|
|
|
+ _Retries0 = 20,
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{
|
|
|
+ <<"metrics">> := #{<<"matched">> := 0},
|
|
|
+ <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
|
|
|
+ }},
|
|
|
+ request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_cluster_later_join_metrics(Config) ->
|
|
|
+ [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config),
|
|
|
+ Name = ?BRIDGE_NAME,
|
|
|
+ ActionParams = ?KAFKA_BRIDGE(Name),
|
|
|
+ ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ %% Create a bridge on only one of the nodes.
|
|
|
+ ?assertMatch({ok, 201, _}, request_json(post, uri([?ROOT]), ActionParams, Config)),
|
|
|
+ %% Pre-condition.
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{
|
|
|
+ <<"metrics">> := #{<<"success">> := _},
|
|
|
+ <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
|
|
|
+ }},
|
|
|
+ request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
|
|
|
+ ),
|
|
|
+ %% Now join the other node join with the api node.
|
|
|
+ ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]),
|
|
|
+ %% Check metrics; shouldn't crash even if the bridge is not
|
|
|
+ %% ready on the node that just joined the cluster.
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, 200, #{
|
|
|
+ <<"metrics">> := #{<<"success">> := _},
|
|
|
+ <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _]
|
|
|
+ }},
|
|
|
+ request_json(get, uri([?ROOT, ActionID, "metrics"]), Config)
|
|
|
+ ),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ []
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
%%% helpers
|
|
|
listen_on_random_port() ->
|
|
|
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
|