Преглед на файлове

fix(bridge_v2_api): avoid calling nodes that do not support minimum bpapi

Fixes https://emqx.atlassian.net/browse/EMQX-11834
Thales Macedo Garitezi преди 2 години
родител
ревизия
76d242df9b
променени са 3 файла, в които са добавени 59 реда и са изтрити 5 реда
  1. 13 3
      apps/emqx_bridge/src/emqx_bridge_v2_api.erl
  2. 45 2
      apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl
  3. 1 0
      changes/ce/fix-12472.en.md

+ 13 - 3
apps/emqx_bridge/src/emqx_bridge_v2_api.erl

@@ -747,7 +747,7 @@ schema("/source_types") ->
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 
 
 handle_list(ConfRootKey) ->
 handle_list(ConfRootKey) ->
-    Nodes = emqx:running_nodes(),
+    Nodes = nodes_supporting_bpapi_version(6),
     NodeReplies = emqx_bridge_proto_v6:v2_list_bridges_on_nodes_v6(Nodes, ConfRootKey),
     NodeReplies = emqx_bridge_proto_v6:v2_list_bridges_on_nodes_v6(Nodes, ConfRootKey),
     case is_ok(NodeReplies) of
     case is_ok(NodeReplies) of
         {ok, NodeBridges} ->
         {ok, NodeBridges} ->
@@ -942,7 +942,7 @@ is_ok(ResL) ->
 %% bridge helpers
 %% bridge helpers
 -spec lookup_from_all_nodes(emqx_bridge_v2:root_cfg_key(), _, _, _) -> _.
 -spec lookup_from_all_nodes(emqx_bridge_v2:root_cfg_key(), _, _, _) -> _.
 lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) ->
 lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) ->
-    Nodes = emqx:running_nodes(),
+    Nodes = nodes_supporting_bpapi_version(6),
     case
     case
         is_ok(
         is_ok(
             emqx_bridge_proto_v6:v2_lookup_from_all_nodes_v6(
             emqx_bridge_proto_v6:v2_lookup_from_all_nodes_v6(
@@ -959,7 +959,7 @@ lookup_from_all_nodes(ConfRootKey, BridgeType, BridgeName, SuccCode) ->
     end.
     end.
 
 
 get_metrics_from_all_nodes(ConfRootKey, Type, Name) ->
 get_metrics_from_all_nodes(ConfRootKey, Type, Name) ->
-    Nodes = emqx:running_nodes(),
+    Nodes = nodes_supporting_bpapi_version(6),
     Result = maybe_unwrap(
     Result = maybe_unwrap(
         emqx_bridge_proto_v6:v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, Type, Name)
         emqx_bridge_proto_v6:v2_get_metrics_from_all_nodes_v6(Nodes, ConfRootKey, Type, Name)
     ),
     ),
@@ -1058,6 +1058,16 @@ supported_versions(_Call) -> bpapi_version_range(6, 6).
 bpapi_version_range(From, To) ->
 bpapi_version_range(From, To) ->
     lists:seq(From, To).
     lists:seq(From, To).
 
 
+nodes_supporting_bpapi_version(Vsn) ->
+    [
+        N
+     || N <- emqx:running_nodes(),
+        case emqx_bpapi:supported_version(N, ?BPAPI_NAME) of
+            undefined -> false;
+            NVsn when is_number(NVsn) -> NVsn >= Vsn
+        end
+    ].
+
 maybe_unwrap({error, not_implemented}) ->
 maybe_unwrap({error, not_implemented}) ->
     {error, not_implemented};
     {error, not_implemented};
 maybe_unwrap(RpcMulticallResult) ->
 maybe_unwrap(RpcMulticallResult) ->

+ 45 - 2
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -106,7 +106,6 @@
 -define(KAFKA_BRIDGE_UPDATE(Name, Connector),
 -define(KAFKA_BRIDGE_UPDATE(Name, Connector),
     maps:without([<<"name">>, <<"type">>], ?KAFKA_BRIDGE(Name, Connector))
     maps:without([<<"name">>, <<"type">>], ?KAFKA_BRIDGE(Name, Connector))
 ).
 ).
--define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?ACTION_CONNECTOR_NAME)).
 
 
 -define(SOURCE_TYPE_STR, "mqtt").
 -define(SOURCE_TYPE_STR, "mqtt").
 -define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
 -define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
@@ -1477,7 +1476,7 @@ t_cluster_later_join_metrics(Config) ->
             ?assertMatch(
             ?assertMatch(
                 {ok, 200, #{
                 {ok, 200, #{
                     <<"metrics">> := #{<<"success">> := _},
                     <<"metrics">> := #{<<"success">> := _},
-                    <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _]
+                    <<"node_metrics">> := [#{<<"metrics">> := #{}} | _]
                 }},
                 }},
                 request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
                 request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
             ),
             ),
@@ -1512,3 +1511,47 @@ t_raw_config_response_defaults(Config) ->
         )
         )
     ),
     ),
     ok.
     ok.
+
+t_older_version_nodes_in_cluster(matrix) ->
+    [
+        [cluster, actions],
+        [cluster, sources]
+    ];
+t_older_version_nodes_in_cluster(Config) ->
+    [_, Kind | _] = group_path(Config),
+    PrimaryNode = ?config(node, Config),
+    OtherNode = maybe_get_other_node(Config),
+    ?assertNotEqual(OtherNode, PrimaryNode),
+    Name = atom_to_binary(?FUNCTION_NAME),
+    ?check_trace(
+        begin
+            #{api_root_key := APIRootKey} = get_common_values(Kind, Name),
+            erpc:call(PrimaryNode, fun() ->
+                meck:new(emqx_bpapi, [no_history, passthrough, no_link]),
+                meck:expect(emqx_bpapi, supported_version, fun(N, Api) ->
+                    case N =:= OtherNode of
+                        true -> 1;
+                        false -> meck:passthrough([N, Api])
+                    end
+                end)
+            end),
+            erpc:call(OtherNode, fun() ->
+                meck:new(emqx_bridge_v2, [no_history, passthrough, no_link]),
+                meck:expect(emqx_bridge_v2, list, fun(_ConfRootKey) ->
+                    error(should_not_be_called)
+                end)
+            end),
+            ?assertMatch(
+                {ok, 200, _},
+                request_json(
+                    get,
+                    uri([APIRootKey]),
+                    Config
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+
+    ok.

+ 1 - 0
changes/ce/fix-12472.en.md

@@ -0,0 +1 @@
+Fixed an issue that could lead to some read operations on `/api/v5/actions/` and `/api/v5/sources/` to return 500 while rolling upgrades are underway.