Procházet zdrojové kódy

Merge remote-tracking branch 'origin/release-50' into 0526-ci-delete-otp-24-from-standalone-app-test

Zaiming (Stone) Shi před 2 roky
rodič
revize
cc5b4d3748
40 změnil soubory, kde provedl 491 přidání a 171 odebrání
  1. 2 2
      Makefile
  2. 1 1
      apps/emqx/include/emqx_release.hrl
  3. 1 1
      apps/emqx/src/emqx_rpc.erl
  4. 4 4
      apps/emqx_authz/src/emqx_authz_api_mnesia.erl
  5. 8 4
      apps/emqx_bridge/src/emqx_bridge_api.erl
  6. 22 8
      apps/emqx_bridge/src/emqx_bridge_resource.erl
  7. 2 21
      apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
  8. 8 21
      apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl
  9. 93 7
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  10. 2 4
      apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl
  11. 2 1
      apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl
  12. 4 3
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl
  13. 10 4
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
  14. 28 33
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
  15. 1 1
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src
  16. 2 2
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl
  17. 2 4
      apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
  18. 2 1
      apps/emqx_connector/src/emqx_connector_http.erl
  19. 1 1
      apps/emqx_ctl/src/emqx_ctl.app.src
  20. 1 1
      apps/emqx_ctl/src/emqx_ctl.erl
  21. 4 2
      apps/emqx_gateway/src/emqx_gateway_api_clients.erl
  22. 27 13
      apps/emqx_management/src/emqx_mgmt_api.erl
  23. 2 2
      apps/emqx_management/src/emqx_mgmt_api_alarms.erl
  24. 2 2
      apps/emqx_management/src/emqx_mgmt_api_topics.erl
  25. 1 1
      apps/emqx_oracle/rebar.config
  26. 1 1
      apps/emqx_oracle/src/emqx_oracle.app.src
  27. 9 9
      apps/emqx_oracle/src/emqx_oracle.erl
  28. 11 4
      apps/emqx_oracle/src/emqx_oracle_schema.erl
  29. 1 1
      apps/emqx_resource/include/emqx_resource.hrl
  30. 4 0
      apps/emqx_resource/src/emqx_resource_manager.erl
  31. 30 12
      apps/emqx_resource/src/schema/emqx_resource_schema.erl
  32. 3 0
      apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
  33. 1 0
      changes/ce/fix-10760.en.md
  34. 1 0
      changes/ce/fix-10817.en.md
  35. 177 0
      changes/e5.0.4.en.md
  36. 1 0
      changes/ee/fix-10741.en.md
  37. 5 0
      rel/i18n/emqx_bridge_oracle.hocon
  38. 5 0
      rel/i18n/emqx_oracle.hocon
  39. 5 0
      rel/i18n/zh/emqx_bridge_oracle.hocon
  40. 5 0
      rel/i18n/zh/emqx_oracle.hocon

+ 2 - 2
Makefile

@@ -15,8 +15,8 @@ endif
 
 # Dashbord version
 # from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.2.4-1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.0.7-beta.3
+export EMQX_DASHBOARD_VERSION ?= v1.2.5
+export EMQX_EE_DASHBOARD_VERSION ?= e1.0.7
 
 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
 # In make 4.4+, for backward-compatibility the value from the original environment is used.

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -35,7 +35,7 @@
 -define(EMQX_RELEASE_CE, "5.0.25").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.0.4-alpha.2").
+-define(EMQX_RELEASE_EE, "5.0.4").
 
 %% the HTTP API version
 -define(EMQX_API_VERSION, "5.0").

+ 1 - 1
apps/emqx/src/emqx_rpc.erl

@@ -147,7 +147,7 @@ unwrap_erpc({throw, A}) ->
     {error, A};
 unwrap_erpc({error, {exception, Err, _Stack}}) ->
     {error, Err};
-unwrap_erpc({error, {exit, Err}}) ->
+unwrap_erpc({exit, Err}) ->
     {error, Err};
 unwrap_erpc({error, {erpc, Err}}) ->
     {error, Err}.

+ 4 - 4
apps/emqx_authz/src/emqx_authz_api_mnesia.erl

@@ -423,8 +423,8 @@ users(get, #{query_string := QueryString}) ->
     of
         {error, page_limit_invalid} ->
             {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
-        {error, Node, {badrpc, R}} ->
-            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+        {error, Node, Error} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
             {500, #{code => <<"NODE_DOWN">>, message => Message}};
         Result ->
             {200, Result}
@@ -459,8 +459,8 @@ clients(get, #{query_string := QueryString}) ->
     of
         {error, page_limit_invalid} ->
             {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
-        {error, Node, {badrpc, R}} ->
-            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+        {error, Node, Error} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
             {500, #{code => <<"NODE_DOWN">>, message => Message}};
         Result ->
             {200, Result}

+ 8 - 4
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -760,7 +760,14 @@ format_bridge_info([FirstBridge | _] = Bridges) ->
     }).
 
 format_bridge_metrics(Bridges) ->
-    NodeMetrics = collect_metrics(Bridges),
+    FilteredBridges = lists:filter(
+        fun
+            ({_Node, Metric}) when is_map(Metric) -> true;
+            (_) -> false
+        end,
+        Bridges
+    ),
+    NodeMetrics = collect_metrics(FilteredBridges),
     #{
         metrics => aggregate_metrics(NodeMetrics),
         node_metrics => NodeMetrics
@@ -919,9 +926,6 @@ fill_defaults(Type, RawConf) ->
 pack_bridge_conf(Type, RawConf) ->
     #{<<"bridges">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
 
-%% Hide webhook's resource_opts.request_timeout from user.
-filter_raw_conf(<<"webhook">>, RawConf0) ->
-    emqx_utils_maps:deep_remove([<<"resource_opts">>, <<"request_timeout">>], RawConf0);
 filter_raw_conf(_TypeBin, RawConf) ->
     RawConf.
 

+ 22 - 8
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -57,11 +57,6 @@
     (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
 ).
 
-%% [FIXME] this has no place here, it's used in parse_confs/3, which should
-%% rather delegate to a behavior callback than implementing domain knowledge
-%% here (reversed dependency)
--define(INSERT_TABLET_PATH, "/rest/v2/insertTablet").
-
 -if(?EMQX_RELEASE_EDITION == ee).
 bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
 bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
@@ -316,7 +311,6 @@ parse_confs(
         url := Url,
         method := Method,
         headers := Headers,
-        request_timeout := ReqTimeout,
         max_retries := Retry
     } = Conf
 ) ->
@@ -330,6 +324,10 @@ parse_confs(
                 Reason1 = emqx_utils:readable_error_msg(Reason),
                 invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
         end,
+    RequestTimeout = emqx_utils_maps:deep_get(
+        [resource_opts, request_timeout],
+        Conf
+    ),
     Conf#{
         base_url => BaseUrl1,
         request =>
@@ -338,11 +336,16 @@ parse_confs(
                 method => Method,
                 body => maps:get(body, Conf, undefined),
                 headers => Headers,
-                request_timeout => ReqTimeout,
+                request_timeout => RequestTimeout,
                 max_retries => Retry
             }
     };
 parse_confs(<<"iotdb">>, Name, Conf) ->
+    %% [FIXME] this has no place here, it's used in parse_confs/3, which should
+    %% rather delegate to a behavior callback than implementing domain knowledge
+    %% here (reversed dependency)
+    InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
+    InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
     #{
         base_url := BaseURL,
         authentication :=
@@ -352,10 +355,21 @@ parse_confs(<<"iotdb">>, Name, Conf) ->
             }
     } = Conf,
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
+    %% This version atom correspond to the macro ?VSN_1_1_X in
+    %% emqx_bridge_iotdb.hrl. It would be better to use the macro directly, but
+    %% this cannot be done without introducing a dependency on the
+    %% emqx_iotdb_bridge app (which is an EE app).
+    DefaultIOTDBBridge = 'v1.1.x',
+    Version = maps:get(iotdb_version, Conf, DefaultIOTDBBridge),
+    InsertTabletPath =
+        case Version of
+            DefaultIOTDBBridge -> InsertTabletPathV2;
+            _ -> InsertTabletPathV1
+        end,
     WebhookConfig =
         Conf#{
             method => <<"post">>,
-            url => <<BaseURL/binary, ?INSERT_TABLET_PATH>>,
+            url => <<BaseURL/binary, InsertTabletPath/binary>>,
             headers => [
                 {<<"Content-type">>, <<"application/json">>},
                 {<<"Authorization">>, BasicToken}

+ 2 - 21
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -251,25 +251,6 @@ node_name() ->
     {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
 
 webhook_bridge_converter(Conf0, _HoconOpts) ->
-    Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee(
+    emqx_bridge_compatible_config:upgrade_pre_ee(
         Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
-    ),
-    case Conf1 of
-        undefined ->
-            undefined;
-        _ ->
-            maps:map(
-                fun(_Name, Conf) ->
-                    do_convert_webhook_config(Conf)
-                end,
-                Conf1
-            )
-    end.
-
-%% We hide resource_opts.request_timeout from user.
-do_convert_webhook_config(
-    #{<<"request_timeout">> := ReqT, <<"resource_opts">> := ResOpts} = Conf
-) ->
-    Conf#{<<"resource_opts">> => ResOpts#{<<"request_timeout">> => ReqT}};
-do_convert_webhook_config(Conf) ->
-    Conf.
+    ).

+ 8 - 21
apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl

@@ -40,15 +40,12 @@ fields("put") ->
 fields("get") ->
     emqx_bridge_schema:status_fields() ++ fields("post");
 fields("creation_opts") ->
-    [
-        hidden_request_timeout()
-        | lists:filter(
-            fun({K, _V}) ->
-                not lists:member(K, unsupported_opts())
-            end,
-            emqx_resource_schema:fields("creation_opts")
-        )
-    ].
+    lists:filter(
+        fun({K, _V}) ->
+            not lists:member(K, unsupported_opts())
+        end,
+        emqx_resource_schema:fields("creation_opts")
+    ).
 
 desc("config") ->
     ?DESC("desc_config");
@@ -144,6 +141,7 @@ request_config() ->
                 emqx_schema:duration_ms(),
                 #{
                     default => <<"15s">>,
+                    deprecated => {since, "v5.0.26"},
                     desc => ?DESC("config_request_timeout")
                 }
             )}
@@ -166,8 +164,7 @@ unsupported_opts() ->
     [
         enable_batch,
         batch_size,
-        batch_time,
-        request_timeout
+        batch_time
     ].
 
 %%======================================================================================
@@ -194,13 +191,3 @@ name_field() ->
 
 method() ->
     enum([post, put, get, delete]).
-
-hidden_request_timeout() ->
-    {request_timeout,
-        mk(
-            hoconsc:union([infinity, emqx_schema:duration_ms()]),
-            #{
-                required => false,
-                importance => ?IMPORTANCE_HIDDEN
-            }
-        )}.

+ 93 - 7
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -79,7 +79,8 @@ groups() ->
     SingleOnlyTests = [
         t_broken_bpapi_vsn,
         t_old_bpapi_vsn,
-        t_bridges_probe
+        t_bridges_probe,
+        t_auto_restart_interval
     ],
     ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
     [
@@ -551,6 +552,89 @@ t_http_crud_apis(Config) ->
 
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
 
+t_auto_restart_interval(Config) ->
+    Port = ?config(port, Config),
+    %% assert we there's no bridges at first
+    {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
+
+    meck:new(emqx_resource, [passthrough]),
+    meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end),
+
+    %% then we add a webhook bridge, using POST
+    %% POST /bridges/ will create a bridge
+    URL1 = ?URL(Port, "path1"),
+    Name = ?BRIDGE_NAME,
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
+    BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{
+        <<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"}
+    },
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {ok, 201, #{
+                    <<"type">> := ?BRIDGE_TYPE_HTTP,
+                    <<"name">> := Name,
+                    <<"enable">> := true,
+                    <<"status">> := _,
+                    <<"node_status">> := [_ | _],
+                    <<"url">> := URL1
+                }},
+                request_json(
+                    post,
+                    uri(["bridges"]),
+                    BridgeParams,
+                    Config
+                )
+            ),
+            {ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
+            {ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(resource_auto_reconnect, Trace0),
+            ?assertMatch([#{}], Trace),
+            ok
+        end
+    ),
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
+    {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
+
+    %% auto_retry_interval=infinity
+    BridgeParams1 = BridgeParams#{
+        <<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"}
+    },
+    ?check_trace(
+        begin
+            ?assertMatch(
+                {ok, 201, #{
+                    <<"type">> := ?BRIDGE_TYPE_HTTP,
+                    <<"name">> := Name,
+                    <<"enable">> := true,
+                    <<"status">> := _,
+                    <<"node_status">> := [_ | _],
+                    <<"url">> := URL1
+                }},
+                request_json(
+                    post,
+                    uri(["bridges"]),
+                    BridgeParams1,
+                    Config
+                )
+            ),
+            {ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
+            ?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500))
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(resource_auto_reconnect, Trace0),
+            ?assertMatch([], Trace),
+            ok
+        end
+    ),
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
+    {ok, 200, []} = request_json(get, uri(["bridges"]), Config),
+    meck:unload(emqx_resource).
+
 t_http_bridges_local_topic(Config) ->
     Port = ?config(port, Config),
     %% assert we there's no bridges at first
@@ -1307,18 +1391,20 @@ t_inconsistent_webhook_request_timeouts(Config) ->
                 <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
             }
         ),
-    {ok, 201, #{
-        <<"request_timeout">> := <<"1s">>,
-        <<"resource_opts">> := ResourceOpts
-    }} =
+    %% root request_timeout is deprecated for bridge.
+    {ok, 201,
+        #{
+            <<"resource_opts">> := ResourceOpts
+        } = Response} =
         request_json(
             post,
             uri(["bridges"]),
             BadBridgeParams,
             Config
         ),
-    ?assertNot(maps:is_key(<<"request_timeout">>, ResourceOpts)),
-    validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name),
+    ?assertNot(maps:is_key(<<"request_timeout">>, Response)),
+    ?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts),
+    validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name),
     ok.
 
 t_cluster_later_join_metrics(Config) ->

+ 2 - 4
apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl

@@ -65,15 +65,13 @@ webhook_config_test() ->
                 <<"the_name">> :=
                     #{
                         <<"method">> := get,
-                        <<"request_timeout">> := RequestTime,
                         <<"resource_opts">> := ResourceOpts,
                         <<"body">> := <<"${payload}">>
                     }
             }
         }
     } = check(Conf3),
-    ?assertEqual(60_000, RequestTime),
-    ?assertMatch(#{<<"request_timeout">> := 60_000}, ResourceOpts),
+    ?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts),
     ok.
 
 up(#{<<"bridges">> := Bridges0} = Conf0) ->
@@ -196,7 +194,7 @@ full_webhook_v5019_hocon() ->
     "      pool_type = \"random\"\n"
     "      request_timeout = \"1m\"\n"
     "      resource_opts = {\n"
-    "        request_timeout = \"7s\"\n"
+    "        request_timeout = \"infinity\"\n"
     "      }\n"
     "      ssl {\n"
     "        ciphers = \"\"\n"

+ 2 - 1
apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl

@@ -5,7 +5,8 @@
 -ifndef(EMQX_BRIDGE_IOTDB_HRL).
 -define(EMQX_BRIDGE_IOTDB_HRL, true).
 
--define(VSN_1_X, 'v1.x').
+-define(VSN_1_1_X, 'v1.1.x').
+-define(VSN_1_0_X, 'v1.0.x').
 -define(VSN_0_13_X, 'v0.13.x').
 
 -endif.

+ 4 - 3
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -109,10 +109,10 @@ basic_config() ->
             )},
         {iotdb_version,
             mk(
-                hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]),
+                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
                 #{
                     desc => ?DESC("config_iotdb_version"),
-                    default => ?VSN_1_X
+                    default => ?VSN_1_1_X
                 }
             )}
     ] ++ resource_creation_opts() ++
@@ -130,6 +130,7 @@ request_config() ->
             mk(
                 emqx_schema:url(),
                 #{
+                    required => true,
                     desc => ?DESC("config_base_url")
                 }
             )},
@@ -217,7 +218,7 @@ conn_bridge_example(_Method, Type) ->
         is_aligned => false,
         device_id => <<"my_device">>,
         base_url => <<"http://iotdb.local:18080/">>,
-        iotdb_version => ?VSN_1_X,
+        iotdb_version => ?VSN_1_1_X,
         connect_timeout => <<"15s">>,
         pool_type => <<"random">>,
         pool_size => 8,

+ 10 - 4
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl

@@ -280,7 +280,7 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) ->
     Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
     IsAligned = maps:get(is_aligned, State, false),
     DeviceId = device_id(Message, State),
-    IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
+    IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
     Payload = make_list(maps:get(payload, Message)),
     PreProcessedData = preproc_data_list(Payload),
     DataList = proc_data(PreProcessedData, Message),
@@ -349,15 +349,21 @@ insert_value(1, Data, [Value | Values]) ->
 insert_value(Index, Data, [Value | Values]) ->
     [[null | Value] | insert_value(Index - 1, Data, Values)].
 
-iotdb_field_key(is_aligned, ?VSN_1_X) ->
+iotdb_field_key(is_aligned, ?VSN_1_1_X) ->
+    <<"is_aligned">>;
+iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
     <<"is_aligned">>;
 iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
     <<"isAligned">>;
-iotdb_field_key(device_id, ?VSN_1_X) ->
+iotdb_field_key(device_id, ?VSN_1_1_X) ->
+    <<"device">>;
+iotdb_field_key(device_id, ?VSN_1_0_X) ->
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_0_13_X) ->
     <<"deviceId">>;
-iotdb_field_key(data_types, ?VSN_1_X) ->
+iotdb_field_key(data_types, ?VSN_1_1_X) ->
+    <<"data_types">>;
+iotdb_field_key(data_types, ?VSN_1_0_X) ->
     <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_0_13_X) ->
     <<"dataTypes">>.

+ 28 - 33
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -994,36 +994,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
         Assignments
     ).
 
-setup_group_subscriber_spy(Node) ->
+setup_group_subscriber_spy_fn() ->
     TestPid = self(),
-    ok = erpc:call(
-        Node,
-        fun() ->
-            ok = meck:new(brod_group_subscriber_v2, [
-                passthrough, no_link, no_history, non_strict
-            ]),
-            ok = meck:expect(
-                brod_group_subscriber_v2,
-                assignments_received,
-                fun(Pid, MemberId, GenerationId, TopicAssignments) ->
-                    ?tp(
-                        kafka_assignment,
-                        #{
-                            node => node(),
-                            pid => Pid,
-                            member_id => MemberId,
-                            generation_id => GenerationId,
-                            topic_assignments => TopicAssignments
-                        }
-                    ),
-                    TestPid !
-                        {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
-                    meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
-                end
-            ),
-            ok
-        end
-    ).
+    fun() ->
+        ok = meck:new(brod_group_subscriber_v2, [
+            passthrough, no_link, no_history, non_strict
+        ]),
+        ok = meck:expect(
+            brod_group_subscriber_v2,
+            assignments_received,
+            fun(Pid, MemberId, GenerationId, TopicAssignments) ->
+                ?tp(
+                    kafka_assignment,
+                    #{
+                        node => node(),
+                        pid => Pid,
+                        member_id => MemberId,
+                        generation_id => GenerationId,
+                        topic_assignments => TopicAssignments
+                    }
+                ),
+                TestPid !
+                    {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
+                meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
+            end
+        ),
+        ok
+    end.
 
 wait_for_cluster_rpc(Node) ->
     %% need to wait until the config handler is ready after
@@ -1067,6 +1064,7 @@ cluster(Config) ->
             _ ->
                 ct_slave
         end,
+    ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
     Cluster = emqx_common_test_helpers:emqx_cluster(
         [core, core],
         [
@@ -1080,6 +1078,7 @@ cluster(Config) ->
             {env_handler, fun
                 (emqx) ->
                     application:set_env(emqx, boot_modules, [broker, router]),
+                    ExtraEnvHandlerHook(),
                     ok;
                 (emqx_conf) ->
                     ok;
@@ -1680,7 +1679,6 @@ t_cluster_group(Config) ->
                     Nodes
                 )
             end),
-            lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
             {ok, SRef0} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
                 length(Nodes),
@@ -1757,7 +1755,6 @@ t_node_joins_existing_cluster(Config) ->
                 ct:pal("stopping ~p", [N1]),
                 ok = emqx_common_test_helpers:stop_slave(N1)
             end),
-            setup_group_subscriber_spy(N1),
             {{ok, _}, {ok, _}} =
                 ?wait_async_action(
                     erpc:call(N1, fun() ->
@@ -1801,7 +1798,6 @@ t_node_joins_existing_cluster(Config) ->
                 ct:pal("stopping ~p", [N2]),
                 ok = emqx_common_test_helpers:stop_slave(N2)
             end),
-            setup_group_subscriber_spy(N2),
             Nodes = [N1, N2],
             wait_for_cluster_rpc(N2),
 
@@ -1902,7 +1898,6 @@ t_cluster_node_down(Config) ->
                     Nodes
                 )
             end),
-            lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
             {ok, SRef0} = snabbkaffe:subscribe(
                 ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
                 length(Nodes),

+ 1 - 1
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_oracle, [
     {description, "EMQX Enterprise Oracle Database Bridge"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 2
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl

@@ -20,7 +20,7 @@
 ]).
 
 -define(DEFAULT_SQL, <<
-    "insert into t_mqtt_msg(msgid, topic, qos, payload)"
+    "insert into t_mqtt_msgs(msgid, topic, qos, payload) "
     "values (${id}, ${topic}, ${qos}, ${payload})"
 >>).
 
@@ -41,7 +41,7 @@ values(_Method) ->
         name => <<"foo">>,
         server => <<"127.0.0.1:1521">>,
         pool_size => 8,
-        database => <<"ORCL">>,
+        service_name => <<"ORCL">>,
         sid => <<"ORCL">>,
         username => <<"root">>,
         password => <<"******">>,

+ 2 - 4
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -14,7 +14,7 @@
 
 -define(BRIDGE_TYPE_BIN, <<"oracle">>).
 -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]).
--define(DATABASE, "XE").
+-define(SID, "XE").
 -define(RULE_TOPIC, "mqtt/rule").
 % -define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
 
@@ -196,7 +196,6 @@ oracle_config(TestCase, _ConnectionType, Config) ->
         io_lib:format(
             "bridges.oracle.~s {\n"
             "  enable = true\n"
-            "  database = \"~s\"\n"
             "  sid = \"~s\"\n"
             "  server = \"~s\"\n"
             "  username = \"system\"\n"
@@ -215,8 +214,7 @@ oracle_config(TestCase, _ConnectionType, Config) ->
             "}\n",
             [
                 Name,
-                ?DATABASE,
-                ?DATABASE,
+                ?SID,
                 ServerURL,
                 sql_insert_template_for_bridge()
             ]

+ 2 - 1
apps/emqx_connector/src/emqx_connector_http.erl

@@ -53,6 +53,7 @@
 ]).
 
 -define(DEFAULT_PIPELINE_SIZE, 100).
+-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
 
 %%=====================================================================
 %% Hocon schema
@@ -467,7 +468,7 @@ preprocess_request(
         path => emqx_plugin_libs_rule:preproc_tmpl(Path),
         body => maybe_preproc_tmpl(body, Req),
         headers => wrap_auth_header(preproc_headers(Headers)),
-        request_timeout => maps:get(request_timeout, Req, 30000),
+        request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS),
         max_retries => maps:get(max_retries, Req, 2)
     }.
 

+ 1 - 1
apps/emqx_ctl/src/emqx_ctl.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ctl, [
     {description, "Backend for emqx_ctl script"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {mod, {emqx_ctl_app, []}},
     {applications, [

+ 1 - 1
apps/emqx_ctl/src/emqx_ctl.erl

@@ -228,7 +228,7 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq})
             ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}),
             {reply, ok, next_seq(State)};
         [[OriginSeq] | _] ->
-            ?LOG_WARNING(#{msg => "CMD_overidden", cmd => Cmd, mf => MF}),
+            ?LOG_WARNING(#{msg => "CMD_overridden", cmd => Cmd, mf => MF}),
             true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}),
             {reply, ok, State}
     end;

+ 4 - 2
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -133,8 +133,10 @@ clients(get, #{
         case Result of
             {error, page_limit_invalid} ->
                 {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
-            {error, Node, {badrpc, R}} ->
-                Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+            {error, Node, Error} ->
+                Message = list_to_binary(
+                    io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])
+                ),
                 {500, #{code => <<"NODE_DOWN">>, message => Message}};
             Response ->
                 {200, Response}

+ 27 - 13
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -134,8 +134,8 @@ do_node_query(
     ResultAcc
 ) ->
     case do_query(Node, QueryState) of
-        {error, {badrpc, R}} ->
-            {error, Node, {badrpc, R}};
+        {error, Error} ->
+            {error, Node, Error};
         {Rows, NQueryState = #{complete := Complete}} ->
             case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
                 {enough, NResultAcc} ->
@@ -179,8 +179,8 @@ do_cluster_query(
     ResultAcc
 ) ->
     case do_query(Node, QueryState) of
-        {error, {badrpc, R}} ->
-            {error, Node, {badrpc, R}};
+        {error, Error} ->
+            {error, Node, Error};
         {Rows, NQueryState = #{complete := Complete}} ->
             case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
                 {enough, NResultAcc} ->
@@ -275,7 +275,7 @@ do_query(Node, QueryState) when Node =:= node() ->
     do_select(Node, QueryState);
 do_query(Node, QueryState) ->
     case
-        rpc:call(
+        catch rpc:call(
             Node,
             ?MODULE,
             do_query,
@@ -284,6 +284,7 @@ do_query(Node, QueryState) ->
         )
     of
         {badrpc, _} = R -> {error, R};
+        {'EXIT', _} = R -> {error, R};
         Ret -> Ret
     end.
 
@@ -298,15 +299,24 @@ do_select(
 ) ->
     QueryState = maybe_apply_total_query(Node, QueryState0),
     Result =
-        case maps:get(continuation, QueryState, undefined) of
-            undefined ->
-                ets:select(Tab, Ms, Limit);
-            Continuation ->
-                %% XXX: Repair is necessary because we pass Continuation back
-                %% and forth through the nodes in the `do_cluster_query`
-                ets:select(ets:repair_continuation(Continuation, Ms))
+        try
+            case maps:get(continuation, QueryState, undefined) of
+                undefined ->
+                    ets:select(Tab, Ms, Limit);
+                Continuation ->
+                    %% XXX: Repair is necessary because we pass Continuation back
+                    %% and forth through the nodes in the `do_cluster_query`
+                    ets:select(ets:repair_continuation(Continuation, Ms))
+            end
+        catch
+            exit:_ = Exit ->
+                {error, Exit};
+            Type:Reason:Stack ->
+                {error, #{exception => Type, reason => Reason, stacktrace => Stack}}
         end,
     case Result of
+        {error, _} ->
+            {[], mark_complete(QueryState)};
         {Rows, '$end_of_table'} ->
             NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
             {NRows, mark_complete(QueryState)};
@@ -354,7 +364,11 @@ counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
     [{MatchHead, Conditions, _Return}] = Ms,
     CountingMs = [{MatchHead, Conditions, [true]}],
     fun(Tab) ->
-        ets:select_count(Tab, CountingMs)
+        try
+            ets:select_count(Tab, CountingMs)
+        catch
+            _Type:_Reason -> 0
+        end
     end;
 counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined ->
     %% XXX: Calculating the total number for a fuzzy searching is very very expensive

+ 2 - 2
apps/emqx_management/src/emqx_mgmt_api_alarms.erl

@@ -123,8 +123,8 @@ alarms(get, #{query_string := QString}) ->
     of
         {error, page_limit_invalid} ->
             {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
-        {error, Node, {badrpc, R}} ->
-            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+        {error, Node, Error} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
             {500, #{code => <<"NODE_DOWN">>, message => Message}};
         Response ->
             {200, Response}

+ 2 - 2
apps/emqx_management/src/emqx_mgmt_api_topics.erl

@@ -120,8 +120,8 @@ do_list(Params) ->
     of
         {error, page_limit_invalid} ->
             {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
-        {error, Node, {badrpc, R}} ->
-            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
+        {error, Node, Error} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
             {500, #{code => <<"NODE_DOWN">>, message => Message}};
         Response ->
             {200, Response}

+ 1 - 1
apps/emqx_oracle/rebar.config

@@ -1,7 +1,7 @@
 %% -*- mode: erlang; -*-
 
 {erl_opts, [debug_info]}.
-{deps, [ {jamdb_oracle, {git, "https://github.com/emqx/jamdb_oracle", {tag, "0.4.9.4"}}}
+{deps, [ {jamdb_oracle, {git, "https://github.com/emqx/jamdb_oracle", {tag, "0.4.9.5"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        ]}.

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [
         kernel,

+ 9 - 9
apps/emqx_oracle/src/emqx_oracle.erl

@@ -75,8 +75,6 @@ on_start(
     InstId,
     #{
         server := Server,
-        database := DB,
-        sid := Sid,
         username := User
     } = Config
 ) ->
@@ -91,15 +89,19 @@ on_start(
     jamdb_oracle_conn:set_max_cursors_number(?MAX_CURSORS),
 
     #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, oracle_host_options()),
-    ServiceName = maps:get(<<"service_name">>, Config, Sid),
+    Sid = maps:get(sid, Config, ""),
+    ServiceName =
+        case maps:get(service_name, Config, undefined) of
+            undefined -> undefined;
+            ServiceName0 -> emqx_plugin_libs_rule:str(ServiceName0)
+        end,
     Options = [
         {host, Host},
         {port, Port},
         {user, emqx_plugin_libs_rule:str(User)},
-        {password, emqx_secret:wrap(maps:get(password, Config, ""))},
+        {password, jamdb_secret:wrap(maps:get(password, Config, ""))},
         {sid, emqx_plugin_libs_rule:str(Sid)},
-        {service_name, emqx_plugin_libs_rule:str(ServiceName)},
-        {database, DB},
+        {service_name, ServiceName},
         {pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)},
         {timeout, ?OPT_TIMEOUT},
         {app_name, "EMQX Data To Oracle Database Action"}
@@ -258,9 +260,7 @@ oracle_host_options() ->
     ?ORACLE_HOST_OPTIONS.
 
 connect(Opts) ->
-    Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
-    NewOpts = lists:keyreplace(password, 1, Opts, {password, Password}),
-    jamdb_oracle:start_link(NewOpts).
+    jamdb_oracle:start_link(Opts).
 
 sql_query_to_str(SqlQuery) ->
     emqx_plugin_libs_rule:str(SqlQuery).

+ 11 - 4
apps/emqx_oracle/src/emqx_oracle_schema.erl

@@ -19,9 +19,11 @@ roots() ->
     [{config, #{type => hoconsc:ref(?REF_MODULE, config)}}].
 
 fields(config) ->
-    [{server, server()}, {sid, fun sid/1}] ++
-        emqx_connector_schema_lib:relational_db_fields() ++
-        emqx_connector_schema_lib:prepare_statement_fields().
+    Fields =
+        [{server, server()}, {sid, fun sid/1}, {service_name, fun service_name/1}] ++
+            emqx_connector_schema_lib:relational_db_fields() ++
+            emqx_connector_schema_lib:prepare_statement_fields(),
+    proplists:delete(database, Fields).
 
 server() ->
     Meta = #{desc => ?DESC(?REF_MODULE, "server")},
@@ -29,5 +31,10 @@ server() ->
 
 sid(type) -> binary();
 sid(desc) -> ?DESC(?REF_MODULE, "sid");
-sid(required) -> true;
+sid(required) -> false;
 sid(_) -> undefined.
+
+service_name(type) -> binary();
+service_name(desc) -> ?DESC(?REF_MODULE, "service_name");
+service_name(required) -> false;
+service_name(_) -> undefined.

+ 1 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -66,7 +66,7 @@
     start_after_created => boolean(),
     %% If the resource disconnected, we can set to retry starting the resource
     %% periodically.
-    auto_restart_interval => pos_integer(),
+    auto_restart_interval => pos_integer() | infinity,
     batch_size => pos_integer(),
     batch_time => pos_integer(),
     max_buffer_bytes => pos_integer(),

+ 4 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -389,8 +389,10 @@ handle_event(state_timeout, health_check, connected, Data) ->
 %% State: DISCONNECTED
 handle_event(enter, _OldState, disconnected = State, Data) ->
     ok = log_state_consistency(State, Data),
+    ?tp(resource_disconnected_enter, #{}),
     {keep_state_and_data, retry_actions(Data)};
 handle_event(state_timeout, auto_retry, disconnected, Data) ->
+    ?tp(resource_auto_reconnect, #{}),
     start_resource(Data, undefined);
 %% State: STOPPED
 %% The stopped state is entered after the resource has been explicitly stopped
@@ -450,6 +452,8 @@ retry_actions(Data) ->
     case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
         undefined ->
             [];
+        infinity ->
+            [];
         RetryInterval ->
             [{state_timeout, RetryInterval, auto_retry}]
     end.

+ 30 - 12
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -102,12 +102,14 @@ health_check_interval_range(HealthCheckInterval) when
         HealthCheckInterval =< ?HEALTH_CHECK_INTERVAL_RANGE_MAX
 ->
     ok;
-health_check_interval_range(_HealthCheckInterval) ->
-    {error, #{
-        msg => <<"Health Check Interval out of range">>,
-        min => ?HEALTH_CHECK_INTERVAL_RANGE_MIN,
-        max => ?HEALTH_CHECK_INTERVAL_RANGE_MAX
-    }}.
+health_check_interval_range(HealthCheckInterval) ->
+    Message = get_out_of_range_msg(
+        <<"Health Check Interval">>,
+        HealthCheckInterval,
+        ?HEALTH_CHECK_INTERVAL_RANGE_MIN,
+        ?HEALTH_CHECK_INTERVAL_RANGE_MAX
+    ),
+    {error, Message}.
 
 start_after_created(type) -> boolean();
 start_after_created(desc) -> ?DESC("start_after_created");
@@ -128,18 +130,22 @@ auto_restart_interval(required) -> false;
 auto_restart_interval(validator) -> fun auto_restart_interval_range/1;
 auto_restart_interval(_) -> undefined.
 
+auto_restart_interval_range(infinity) ->
+    ok;
 auto_restart_interval_range(AutoRestartInterval) when
     is_integer(AutoRestartInterval) andalso
         AutoRestartInterval >= ?AUTO_RESTART_INTERVAL_RANGE_MIN andalso
         AutoRestartInterval =< ?AUTO_RESTART_INTERVAL_RANGE_MAX
 ->
     ok;
-auto_restart_interval_range(_AutoRestartInterval) ->
-    {error, #{
-        msg => <<"Auto Restart Interval out of range">>,
-        min => ?AUTO_RESTART_INTERVAL_RANGE_MIN,
-        max => ?AUTO_RESTART_INTERVAL_RANGE_MAX
-    }}.
+auto_restart_interval_range(AutoRestartInterval) ->
+    Message = get_out_of_range_msg(
+        <<"Auto Restart Interval">>,
+        AutoRestartInterval,
+        ?AUTO_RESTART_INTERVAL_RANGE_MIN,
+        ?AUTO_RESTART_INTERVAL_RANGE_MAX
+    ),
+    {error, Message}.
 
 query_mode(type) -> enum([sync, async]);
 query_mode(desc) -> ?DESC("query_mode");
@@ -206,3 +212,15 @@ buffer_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN;
 buffer_seg_bytes(_) -> undefined.
 
 desc("creation_opts") -> ?DESC("creation_opts").
+
+get_value_with_unit(Value) when is_integer(Value) ->
+    <<(erlang:integer_to_binary(Value))/binary, "ms">>;
+get_value_with_unit(Value) ->
+    Value.
+
+get_out_of_range_msg(Field, Value, Min, Max) ->
+    ValueStr = get_value_with_unit(Value),
+    MinStr = get_value_with_unit(Min),
+    MaxStr = get_value_with_unit(Max),
+    <<Field/binary, " (", ValueStr/binary, ") is out of range (", MinStr/binary, " to ",
+        MaxStr/binary, ")">>.

+ 3 - 0
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -339,6 +339,9 @@ param_path_id() ->
     of
         {error, page_limit_invalid} ->
             {400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
+        {error, Node, Error} ->
+            Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
+            {500, #{code => <<"NODE_DOWN">>, message => Message}};
         Result ->
             {200, Result}
     end;

+ 1 - 0
changes/ce/fix-10760.en.md

@@ -0,0 +1 @@
+Fix Internal Error 500 that occurred sometimes when bridge statistics page was updated while a node was (re)joining the cluster.

+ 1 - 0
changes/ce/fix-10817.en.md

@@ -0,0 +1 @@
+Fix the error of not being able to configure `auto_restart_interval` as infinity

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 177 - 0
changes/e5.0.4.en.md


+ 1 - 0
changes/ee/fix-10741.en.md

@@ -0,0 +1 @@
+Fix password leaking on stacktrace for Oracle Database.

+ 5 - 0
rel/i18n/emqx_bridge_oracle.hocon

@@ -29,6 +29,11 @@ emqx_bridge_oracle {
     label = "Oracle Database Sid."
   }
 
+  service_name {
+    desc = "Service Name for Oracle Database."
+    label = "Oracle Database Service Name"
+  }
+
   config_enable {
     desc = "Enable or disable this bridge"
     label = "Enable Or Disable Bridge"

+ 5 - 0
rel/i18n/emqx_oracle.hocon

@@ -12,4 +12,9 @@ emqx_oracle {
     label = "Oracle Database Sid"
   }
 
+  service_name {
+    desc = "Service Name for Oracle Database."
+    label = "Oracle Database Service Name"
+  }
+
 }

+ 5 - 0
rel/i18n/zh/emqx_bridge_oracle.hocon

@@ -28,6 +28,11 @@ emqx_bridge_oracle {
     label = "Oracle Database Sid"
   }
 
+  service_name {
+    desc = "Oracle Database 服务名称。"
+    label = "Oracle Database 服务名称"
+  }
+
   config_enable {
     desc = "启用/禁用桥接"
     label = "启用/禁用桥接"

+ 5 - 0
rel/i18n/zh/emqx_oracle.hocon

@@ -12,4 +12,9 @@ emqx_oracle {
     label = "Oracle Database Sid"
   }
 
+  service_name {
+    desc = "Oracle Database 服务名称。"
+    label = "Oracle Database 服务名称"
+  }
+
 }