ソースを参照

fix(bridge_api): don't crash when formatting empty/unknown bridge metrics

Fixes https://emqx.atlassian.net/browse/EMQX-9872
Thales Macedo Garitezi 2 年 前
コミット
09ea2e2224

+ 7 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -653,11 +653,17 @@ emqx_cluster(Specs0, CommonOpts) ->
     ]),
     %% Set the default node of the cluster:
     CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
-    JoinTo =
+    JoinTo0 =
         case CoreNodes of
             [First | _] -> First;
             _ -> undefined
         end,
+    JoinTo =
+        case maps:find(join_to, CommonOpts) of
+            {ok, true} -> JoinTo0;
+            {ok, JT} -> JT;
+            error -> JoinTo0
+        end,
     [
         {Name,
             merge_opts(Opts, #{

+ 23 - 0
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -882,6 +882,29 @@ format_metrics(#{
         Rate5m,
         RateMax,
         Rcvd
+    );
+format_metrics(_Metrics) ->
+    %% Empty metrics: can happen when a node joins another and a
+    %% bridge is not yet replicated to it, so the counters map is
+    %% empty.
+    ?METRICS(
+        _Dropped = 0,
+        _DroppedOther = 0,
+        _DroppedExpired = 0,
+        _DroppedQueueFull = 0,
+        _DroppedResourceNotFound = 0,
+        _DroppedResourceStopped = 0,
+        _Matched = 0,
+        _Queued = 0,
+        _Retried = 0,
+        _LateReply = 0,
+        _SentFailed = 0,
+        _SentInflight = 0,
+        _SentSucc = 0,
+        _Rate = 0,
+        _Rate5m = 0,
+        _RateMax = 0,
+        _Rcvd = 0
     ).
 
 fill_defaults(Type, RawConf) ->

+ 63 - 3
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -70,18 +70,22 @@
 all() ->
     [
         {group, single},
+        {group, cluster_later_join},
         {group, cluster}
     ].
 
 groups() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
     SingleOnlyTests = [
         t_broken_bpapi_vsn,
         t_old_bpapi_vsn,
         t_bridges_probe
     ],
+    ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
     [
-        {single, [], emqx_common_test_helpers:all(?MODULE)},
-        {cluster, [], emqx_common_test_helpers:all(?MODULE) -- SingleOnlyTests}
+        {single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
+        {cluster_later_join, [], ClusterLaterJoinOnlyTCs},
+        {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs}
     ].
 
 suite() ->
@@ -104,6 +108,17 @@ init_per_group(cluster, Config) ->
     ok = erpc:call(NodePrimary, fun() -> init_node(primary) end),
     _ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest],
     [{group, cluster}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config];
+init_per_group(cluster_later_join, Config) ->
+    Cluster = mk_cluster_specs(Config, #{join_to => undefined}),
+    ct:pal("Starting ~p", [Cluster]),
+    Nodes = [
+        emqx_common_test_helpers:start_slave(Name, Opts)
+     || {Name, Opts} <- Cluster
+    ],
+    [NodePrimary | NodesRest] = Nodes,
+    ok = erpc:call(NodePrimary, fun() -> init_node(primary) end),
+    _ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest],
+    [{group, cluster_later_join}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config];
 init_per_group(_, Config) ->
     ok = emqx_mgmt_api_test_util:init_suite(?SUITE_APPS),
     ok = load_suite_config(emqx_rule_engine),
@@ -111,6 +126,9 @@ init_per_group(_, Config) ->
     [{group, single}, {api_node, node()} | Config].
 
 mk_cluster_specs(Config) ->
+    mk_cluster_specs(Config, #{}).
+
+mk_cluster_specs(Config, Opts) ->
     Specs = [
         {core, emqx_bridge_api_SUITE1, #{}},
         {core, emqx_bridge_api_SUITE2, #{}}
@@ -132,6 +150,7 @@ mk_cluster_specs(Config) ->
         load_apps => ?SUITE_APPS ++ [emqx_dashboard],
         env_handler => fun load_suite_config/1,
         load_schema => false,
+        join_to => maps:get(join_to, Opts, true),
         priv_data_dir => ?config(priv_dir, Config)
     },
     emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts).
@@ -164,7 +183,10 @@ load_suite_config(emqx_bridge) ->
 load_suite_config(_) ->
     ok.
 
-end_per_group(cluster, Config) ->
+end_per_group(Group, Config) when
+    Group =:= cluster;
+    Group =:= cluster_later_join
+->
     ok = lists:foreach(
         fun(Node) ->
             _ = erpc:call(Node, emqx_common_test_helpers, stop_apps, [?SUITE_APPS]),
@@ -1298,6 +1320,44 @@ t_inconsistent_webhook_request_timeouts(Config) ->
     validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name),
     ok.
 
+t_cluster_later_join_metrics(Config) ->
+    Port = ?config(port, Config),
+    APINode = ?config(api_node, Config),
+    ClusterNodes = ?config(cluster_nodes, Config),
+    [OtherNode | _] = ClusterNodes -- [APINode],
+    URL1 = ?URL(Port, "path1"),
+    Name = ?BRIDGE_NAME,
+    BridgeParams = ?HTTP_BRIDGE(URL1, Name),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
+    ?check_trace(
+        begin
+            %% Create a bridge on only one of the nodes.
+            ?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)),
+            %% Pre-condition.
+            ?assertMatch(
+                {ok, 200, #{
+                    <<"metrics">> := #{<<"success">> := _},
+                    <<"node_metrics">> := [_ | _]
+                }},
+                request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
+            ),
+            %% Now join the other node join with the api node.
+            ok = erpc:call(OtherNode, ekka, join, [APINode]),
+            %% Check metrics; shouldn't crash even if the bridge is not
+            %% ready on the node that just joined the cluster.
+            ?assertMatch(
+                {ok, 200, #{
+                    <<"metrics">> := #{<<"success">> := _},
+                    <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _]
+                }},
+                request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 validate_resource_request_timeout(single, Timeout, Name) ->
     SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
     BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),

+ 1 - 0
changes/ce/fix-10743.en.md

@@ -0,0 +1 @@
+Fixes an issue where trying to get a bridge info or metrics could result in a crash when a node is joining a cluster.