Просмотр исходного кода

Merge remote-tracking branch 'origin/release-58' into sync-release-58-20240823-021936

id 1 год назад
Родитель
Сommit
ab62bd7f0f

+ 2 - 2
Makefile

@@ -10,8 +10,8 @@ include env.sh
 
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
-export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.2
+export EMQX_DASHBOARD_VERSION ?= v1.10.0-beta.2
+export EMQX_EE_DASHBOARD_VERSION ?= e1.8.0-beta.4
 
 export EMQX_RELUP ?= true
 export EMQX_REL_FORM ?= tgz

+ 1 - 2
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -651,8 +651,7 @@ to_trace(#{type := Type}, _Rec) ->
 to_trace(#{payload_encode := PayloadEncode} = Trace, Rec) ->
     to_trace(maps:remove(payload_encode, Trace), Rec#?TRACE{payload_encode = PayloadEncode});
 to_trace(#{start_at := StartAt} = Trace, Rec) ->
-    {ok, Sec} = to_system_second(StartAt),
-    to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec});
+    to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = StartAt});
 to_trace(#{end_at := EndAt} = Trace, Rec) ->
     Now = now_second(),
     case to_system_second(EndAt) of

+ 1 - 0
apps/emqx/src/emqx_trace/emqx_trace_dl.erl

@@ -53,6 +53,7 @@ update(Name, Enable) ->
 insert_new_trace(Trace) ->
     case mnesia:read(?TRACE, Trace#?TRACE.name) of
         [] ->
+            %% allow one new trace for each filter in the same second
             #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
             Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
             case mnesia:match_object(?TRACE, Match, read) of

+ 24 - 22
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -114,15 +114,7 @@ end_per_group(_Group, _Config) ->
 
 t_takeover(Config) ->
     process_flag(trap_exit, true),
-    Vsn = atom_to_list(?config(mqtt_vsn, Config)),
-    Persist =
-        case ?config(persistence_enabled, Config) of
-            true ->
-                "persistent-";
-            false ->
-                "not-persistent-"
-        end,
-    ClientId = iolist_to_binary("t_takeover-" ++ Persist ++ Vsn),
+    ClientId = make_client_id(?FUNCTION_NAME, Config),
     ClientOpts = [
         {proto_ver, ?config(mqtt_vsn, Config)},
         {clean_start, false}
@@ -153,7 +145,7 @@ t_takeover(Config) ->
             [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
             {fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]},
             [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
-            {fun stop_client/1, []}
+            {fun stop_the_last_client/1, []}
         ]),
 
     Sleep =
@@ -184,7 +176,7 @@ t_takeover(Config) ->
 
 t_takeover_willmsg(Config) ->
     process_flag(trap_exit, true),
-    ClientId = atom_to_binary(?FUNCTION_NAME),
+    ClientId = make_client_id(?FUNCTION_NAME, Config),
     WillTopic = <<ClientId/binary, <<"_willtopic">>/binary>>,
     Middle = ?CNT div 2,
     Client1Msgs = messages(ClientId, 0, Middle),
@@ -208,25 +200,27 @@ t_takeover_willmsg(Config) ->
             [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
             %% WHEN client reconnect with clean_start = false
             {fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]},
-            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs]
+            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs],
+            {fun stop_the_last_client/1, []}
         ]),
+    Sleep =
+        case ?config(persistence_enabled, Config) of
+            true -> 2_000;
+            false -> ?SLEEP
+        end,
 
     FCtx = lists:foldl(
         fun({Fun, Args}, Ctx) ->
             ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
             apply(Fun, [Ctx | Args])
         end,
-        #{persistence_enabled => ?config(persistence_enabled, Config)},
+        #{persistence_enabled => ?config(persistence_enabled, Config), sleep => Sleep},
         Commands
     ),
 
     #{client := [CPid2, CPidSub, CPid1]} = FCtx,
     assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
-    Sleep =
-        case ?config(persistence_enabled, Config) of
-            true -> 2_000;
-            false -> ?SLEEP
-        end,
+    ?assertReceive({'EXIT', CPid2, normal}),
     Received = [Msg || {publish, Msg} <- ?drainMailbox(Sleep)],
     ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>),
@@ -235,9 +229,6 @@ t_takeover_willmsg(Config) ->
     %% THEN will message should be received
     ?assert(IsWill),
     emqtt:stop(CPidSub),
-    emqtt:stop(CPid2),
-    ?assertReceive({'EXIT', CPid2, normal}),
-    ?assert(not is_process_alive(CPid1)),
     ok.
 
 t_takeover_willmsg_clean_session(Config) ->
@@ -992,7 +983,7 @@ publish_msg(Ctx, Msg) ->
         [_ | _] -> Ctx
     end.
 
-stop_client(Ctx = #{client := [CPid | _], sleep := Sleep}) ->
+stop_the_last_client(Ctx = #{client := [CPid | _], sleep := Sleep}) ->
     ok = timer:sleep(Sleep),
     ok = emqtt:stop(CPid),
     Ctx.
@@ -1082,3 +1073,14 @@ assert_client_exit(Pid, v5, kicked) ->
     ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}});
 assert_client_exit(Pid, _, killed) ->
     ?assertReceive({'EXIT', Pid, killed}).
+
+make_client_id(Case, Config) ->
+    Vsn = atom_to_list(?config(mqtt_vsn, Config)),
+    Persist =
+        case ?config(persistence_enabled, Config) of
+            true ->
+                "-persistent-";
+            false ->
+                "-not-persistent-"
+        end,
+    iolist_to_binary([atom_to_binary(Case), Persist ++ Vsn]).

+ 10 - 10
apps/emqx_auth_http/test/emqx_authn_scram_restapi_SUITE.erl

@@ -157,7 +157,7 @@ t_authenticate_bad_username(_Config) ->
 
     {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
 
-    ClientFirstMessage = esasl_scram:client_first_message(<<"badusername">>),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(<<"badusername">>),
 
     ConnectPacket = ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -182,7 +182,7 @@ t_authenticate_bad_password(_Config) ->
 
     {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
 
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(Username),
 
     ConnectPacket = ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -202,7 +202,7 @@ t_authenticate_bad_password(_Config) ->
     ) = receive_packet(),
 
     {continue, ClientFinalMessage, _ClientCache} =
-        esasl_scram:check_server_first_message(
+        sasl_auth_scram:check_server_first_message(
             ServerFirstMessage,
             #{
                 client_first_message => ClientFirstMessage,
@@ -321,7 +321,7 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
 
     set_user_handler(Username, Password, #{is_superuser => ExpectedIsSuperuser}),
 
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(Username),
 
     {continue, ServerFirstMessage, ServerCache} =
         emqx_authn_scram_restapi:authenticate(
@@ -334,7 +334,7 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
         ),
 
     {continue, ClientFinalMessage, ClientCache} =
-        esasl_scram:check_server_first_message(
+        sasl_auth_scram:check_server_first_message(
             ServerFirstMessage,
             #{
                 client_first_message => ClientFirstMessage,
@@ -353,7 +353,7 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
             State
         ),
 
-    ok = esasl_scram:check_server_final_message(
+    ok = sasl_auth_scram:check_server_final_message(
         ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
     ),
 
@@ -410,7 +410,7 @@ init_auth(Config) ->
     State.
 
 make_user_info(Password, Algorithm, IterationCount) ->
-    {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(
+    {StoredKey, ServerKey, Salt} = sasl_auth_scram:generate_authentication_info(
         Password,
         #{
             algorithm => Algorithm,
@@ -435,7 +435,7 @@ receive_packet() ->
 create_connection(Username, Password) ->
     {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
 
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(Username),
 
     ConnectPacket = ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -458,7 +458,7 @@ create_connection(Username, Password) ->
     ) = receive_packet(),
 
     {continue, ClientFinalMessage, ClientCache} =
-        esasl_scram:check_server_first_message(
+        sasl_auth_scram:check_server_first_message(
             ServerFirstMessage,
             #{
                 client_first_message => ClientFirstMessage,
@@ -483,7 +483,7 @@ create_connection(Username, Password) ->
         #{'Authentication-Data' := ServerFinalMessage}
     ) = receive_packet(),
 
-    ok = esasl_scram:check_server_final_message(
+    ok = sasl_auth_scram:check_server_final_message(
         ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
     ),
     {ok, Pid}.

+ 1 - 1
apps/emqx_auth_kerberos/rebar.config

@@ -3,5 +3,5 @@
 {deps, [
     {emqx, {path, "../emqx"}},
     {emqx_utils, {path, "../emqx_utils"}},
-    {sasl_auth, {git, "https://github.com/kafka4beam/sasl_auth.git", {tag, "v2.2.0"}}}
+    {sasl_auth, "2.3.0"}
 ]}.

+ 2 - 7
apps/emqx_auth_kerberos/src/emqx_authn_kerberos.erl

@@ -17,13 +17,8 @@
     authenticate/2
 ]).
 
-create(
-    AuthenticatorID,
-    #{
-        principal := Principal,
-        keytab_file := KeyTabFile
-    }
-) ->
+create(AuthenticatorID, #{principal := Principal} = Conf) ->
+    KeyTabFile = maps:get(keytab_file, Conf, <<"">>),
     KeyTabPath = resolve_keytab(KeyTabFile),
     %% kinit is not necessary for server because the keytab file
     %% must be the smae as default keytab

+ 2 - 1
apps/emqx_auth_kerberos/test/emqx_authn_kerberos_SUITE.erl

@@ -70,7 +70,8 @@ t_create(_Config) ->
         emqx_authn_chains:list_authenticators(?GLOBAL).
 
 t_create_invalid(_Config) ->
-    InvalidConfig0 = raw_config(),
+    %% cover the case when keytab_file is not provided
+    InvalidConfig0 = maps:remove(<<"keytab_file">>, raw_config()),
     InvalidConfig = InvalidConfig0#{<<"principal">> := ?INVALID_SVR_PRINCIPAL},
 
     {error, _} = emqx:update_config(

+ 1 - 1
apps/emqx_auth_mnesia/mix.exs

@@ -27,7 +27,7 @@ defmodule EMQXAuthMnesia.MixProject do
     [
       {:emqx, in_umbrella: true},
       {:emqx_auth, in_umbrella: true},
-      UMP.common_dep(:esasl),
+      UMP.common_dep(:sasl_auth),
     ]
   end
 end

+ 1 - 1
apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src

@@ -9,7 +9,7 @@
         stdlib,
         emqx,
         emqx_auth,
-        esasl
+        sasl_auth
     ]},
     {env, []},
     {modules, []},

+ 2 - 2
apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl

@@ -268,7 +268,7 @@ user_info_record(
     user_info_record(UserGroup, UserID, Password, IsSuperuser, State).
 
 user_info_record(UserGroup, UserID, Password, IsSuperuser, State) ->
-    {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(Password, State),
+    {StoredKey, ServerKey, Salt} = sasl_auth_scram:generate_authentication_info(Password, State),
     #user_info{
         user_id = {UserGroup, UserID},
         stored_key = StoredKey,
@@ -282,7 +282,7 @@ fields_to_update(
     [keys_and_salt | Rest],
     State
 ) ->
-    {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(Password, State),
+    {StoredKey, ServerKey, Salt} = sasl_auth_scram:generate_authentication_info(Password, State),
     [
         {keys_and_salt, {StoredKey, ServerKey, Salt}}
         | fields_to_update(UserInfo, Rest, State)

+ 12 - 12
apps/emqx_auth_mnesia/test/emqx_authn_scram_mnesia_SUITE.erl

@@ -107,7 +107,7 @@ t_authenticate(_Config) ->
 
     {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
 
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(Username),
 
     ConnectPacket = ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -130,7 +130,7 @@ t_authenticate(_Config) ->
     ) = receive_packet(),
 
     {continue, ClientFinalMessage, ClientCache} =
-        esasl_scram:check_server_first_message(
+        sasl_auth_scram:check_server_first_message(
             ServerFirstMessage,
             #{
                 client_first_message => ClientFirstMessage,
@@ -155,7 +155,7 @@ t_authenticate(_Config) ->
         #{'Authentication-Data' := ServerFinalMessage}
     ) = receive_packet(),
 
-    ok = esasl_scram:check_server_final_message(
+    ok = sasl_auth_scram:check_server_final_message(
         ServerFinalMessage, ClientCache#{algorithm => Algorithm}
     ).
 
@@ -190,7 +190,7 @@ t_authenticate_bad_username(_Config) ->
 
     {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
 
-    ClientFirstMessage = esasl_scram:client_first_message(<<"badusername">>),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(<<"badusername">>),
 
     ConnectPacket = ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -215,7 +215,7 @@ t_authenticate_bad_password(_Config) ->
 
     {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
 
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(Username),
 
     ConnectPacket = ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -235,7 +235,7 @@ t_authenticate_bad_password(_Config) ->
     ) = receive_packet(),
 
     {continue, ClientFinalMessage, _ClientCache} =
-        esasl_scram:check_server_first_message(
+        sasl_auth_scram:check_server_first_message(
             ServerFirstMessage,
             #{
                 client_first_message => ClientFirstMessage,
@@ -333,7 +333,7 @@ t_update_user_keys(_Config) ->
 
     {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
 
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(Username),
 
     ConnectPacket = ?CONNECT_PACKET(
         #mqtt_packet_connect{
@@ -353,7 +353,7 @@ t_update_user_keys(_Config) ->
     ) = receive_packet(),
 
     {continue, ClientFinalMessage, ClientCache} =
-        esasl_scram:check_server_first_message(
+        sasl_auth_scram:check_server_first_message(
             ServerFirstMessage,
             #{
                 client_first_message => ClientFirstMessage,
@@ -378,7 +378,7 @@ t_update_user_keys(_Config) ->
         #{'Authentication-Data' := ServerFinalMessage}
     ) = receive_packet(),
 
-    ok = esasl_scram:check_server_final_message(
+    ok = sasl_auth_scram:check_server_final_message(
         ServerFinalMessage, ClientCache#{algorithm => Algorithm}
     ).
 
@@ -447,7 +447,7 @@ test_is_superuser(UserInfo, ExpectedIsSuperuser) ->
 
     {ok, _} = emqx_authn_scram_mnesia:add_user(UserInfo0, State),
 
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
+    ClientFirstMessage = sasl_auth_scram:client_first_message(Username),
 
     {continue, ServerFirstMessage, ServerCache} =
         emqx_authn_scram_mnesia:authenticate(
@@ -460,7 +460,7 @@ test_is_superuser(UserInfo, ExpectedIsSuperuser) ->
         ),
 
     {continue, ClientFinalMessage, ClientCache} =
-        esasl_scram:check_server_first_message(
+        sasl_auth_scram:check_server_first_message(
             ServerFirstMessage,
             #{
                 client_first_message => ClientFirstMessage,
@@ -479,7 +479,7 @@ test_is_superuser(UserInfo, ExpectedIsSuperuser) ->
             State
         ),
 
-    ok = esasl_scram:check_server_final_message(
+    ok = sasl_auth_scram:check_server_final_message(
         ServerFinalMessage, ClientCache#{algorithm => sha512}
     ),
 

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -10,7 +10,7 @@
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}},
-    {sasl_auth, {git, "https://github.com/kafka4beam/sasl_auth.git", {tag, "v2.2.0"}}}
+    {sasl_auth, "2.3.0"}
 ]}.
 
 {shell, [

+ 8 - 1
apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl

@@ -16,7 +16,7 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    Config.
+    [{work_dir, emqx_cth_suite:work_dir(Config)} | Config].
 
 end_per_suite(_Config) ->
     ok.
@@ -25,6 +25,11 @@ init_per_testcase(TCName, Config) ->
     emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config).
 
 end_per_testcase(TCName, Config) ->
+    %% @NOTE: Clean work_dir for this TC to avoid running out of disk space
+    %% causing other test run flaky. Uncomment it if you need to preserve the
+    %% work_dir for troubleshooting
+    t_config_update_ds =:= TCName andalso
+        emqx_cth_suite:clean_work_dir(?config(work_dir, Config)),
     emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).
 
 mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) ->
@@ -372,6 +377,8 @@ t_config_update_ds('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(cluster_b, Config)).
 
 t_config_update_ds(Config) ->
+    %% @NOTE: for troubleshooting this TC,
+    %% take a look in end_per_testcase/2 to preserve the work dir
     [NodeA1, _, _] = ?config(cluster_a, Config),
     [NodeB1, _] = ?config(cluster_b, Config),
     LPortA = ?config(lport_a, Config),

+ 4 - 0
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -1066,6 +1066,10 @@ groups() ->
         {skipstream_lts, TCs}
     ].
 
+init_per_suite(Config) ->
+    application:set_env(kernel, prevent_overlapping_partitions, false),
+    Config.
+
 init_per_group(Group, Config) ->
     LayoutConf =
         case Group of

+ 2 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -132,10 +132,10 @@ init_data(#share{topic = Topic} = ShareTopicFilter, StartTime) ->
     Group = group_name(ShareTopicFilter),
     case emqx_ds_shared_sub_leader_store:open(Group) of
         Store when Store =/= false ->
-            ?tp(warning, shared_sub_leader_store_open, #{topic => ShareTopicFilter, store => Store}),
+            ?tp(debug, shared_sub_leader_store_open, #{topic => ShareTopicFilter, store => Store}),
             ok;
         false ->
-            ?tp(warning, shared_sub_leader_store_init, #{topic => ShareTopicFilter}),
+            ?tp(debug, shared_sub_leader_store_init, #{topic => ShareTopicFilter}),
             RankProgress = emqx_ds_shared_sub_leader_rank_progress:init(),
             Store0 = emqx_ds_shared_sub_leader_store:init(Group),
             Store1 = emqx_ds_shared_sub_leader_store:set(start_time, StartTime, Store0),

+ 0 - 1
apps/emqx_machine/priv/reboot_lists.eterm

@@ -38,7 +38,6 @@
         [
             emqx,
             emqx_conf,
-            esasl,
             emqx_utils,
             emqx_durable_storage,
             emqx_ds_backends,

+ 2 - 1
apps/emqx_management/src/emqx_mgmt_api_relup.erl

@@ -451,7 +451,8 @@ validate_name(Name) ->
     {204}.
 
 '/relup/status'(get, _) ->
-    {[_ | _] = Res, []} = emqx_mgmt_api_relup_proto_v1:get_upgrade_status_from_all_nodes(),
+    Nodes = emqx:running_nodes(),
+    {[_ | _] = Res, []} = emqx_mgmt_api_relup_proto_v1:get_upgrade_status_from_nodes(Nodes),
     case
         lists:filter(
             fun

+ 4 - 4
apps/emqx_management/src/proto/emqx_mgmt_api_relup_proto_v1.erl

@@ -22,7 +22,7 @@
 -export([
     introduced_in/0,
     run_upgrade/1,
-    get_upgrade_status_from_all_nodes/0,
+    get_upgrade_status_from_nodes/1,
     get_upgrade_status/1
 ]).
 
@@ -36,9 +36,9 @@ introduced_in() ->
 run_upgrade(Nodes) ->
     rpc:multicall(Nodes, emqx_mgmt_api_relup, emqx_relup_upgrade, [], ?RPC_TIMEOUT_OP).
 
--spec get_upgrade_status_from_all_nodes() -> emqx_rpc:multicall_result().
-get_upgrade_status_from_all_nodes() ->
-    rpc:multicall(emqx_mgmt_api_relup, get_upgrade_status, [], ?RPC_TIMEOUT_INFO).
+-spec get_upgrade_status_from_nodes([node()]) -> emqx_rpc:multicall_result().
+get_upgrade_status_from_nodes(Nodes) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_relup, get_upgrade_status, [], ?RPC_TIMEOUT_INFO).
 
 -spec get_upgrade_status(node()) -> emqx_rpc:call_result(map()).
 get_upgrade_status(Node) ->

+ 66 - 22
apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl

@@ -89,6 +89,7 @@ fetch_cluster_consistented_data() ->
     Connectors = emqx_connector:list(),
     (maybe_collect_schema_registry())#{
         rules_ov_data => rules_ov_data(Rules),
+        actions_ov_data => actions_ov_data(Rules),
         connectors_ov_data => connectors_ov_data(BridgesV1, Connectors)
     }.
 
@@ -121,9 +122,20 @@ collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
 
     %% Data Integration Overview
-    ok = add_collect_family(Callback, rules_ov_metric_meta(), ?MG(rules_ov_data, RawData)),
     ok = add_collect_family(
-        Callback, connectors_ov_metric_meta(), ?MG(connectors_ov_data, RawData)
+        Callback,
+        rules_ov_metric_meta(),
+        ?MG(rules_ov_data, RawData)
+    ),
+    ok = add_collect_family(
+        Callback,
+        actions_ov_metric_meta(),
+        ?MG(actions_ov_data, RawData)
+    ),
+    ok = add_collect_family(
+        Callback,
+        connectors_ov_metric_meta(),
+        ?MG(connectors_ov_data, RawData)
     ),
     ok = maybe_collect_family_schema_registry(Callback),
 
@@ -188,6 +200,9 @@ collect_metrics(Name, Metrics) ->
 %% Rules
 collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data));
 %%====================
+%% Actions
+collect_di(K = emqx_actions_count, Data) -> gauge_metric(?MG(K, Data));
+%%====================
 %% Schema Registry
 collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data));
 %%====================
@@ -262,6 +277,32 @@ rules_ov_data(_Rules) ->
         emqx_rules_count => ets:info(?RULE_TAB, size)
     }.
 
+%%====================
+%% Actions
+
+actions_ov_metric_meta() ->
+    [
+        {emqx_actions_count, gauge}
+    ].
+
+actions_ov_metric(names) ->
+    emqx_prometheus_cluster:metric_names(actions_ov_metric_meta()).
+
+actions_ov_data(Rules) ->
+    ActionsCount = lists:foldl(
+        fun
+            (#{actions := Actions} = _Rule, AccIn) ->
+                AccIn + length(Actions);
+            (_, AccIn) ->
+                AccIn
+        end,
+        0,
+        Rules
+    ),
+    #{
+        emqx_actions_count => ActionsCount
+    }.
+
 %%====================
 %% Schema Registry
 
@@ -366,24 +407,21 @@ rule_point(Mode, Id, V) ->
     {with_node_label(Mode, [{id, Id}]), V}.
 
 get_metric(#{id := Id, enable := Bool} = _Rule) ->
-    case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
-        #{counters := Counters} ->
-            #{
-                emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool),
-                emqx_rule_matched => ?MG(matched, Counters),
-                emqx_rule_failed => ?MG(failed, Counters),
-                emqx_rule_passed => ?MG(passed, Counters),
-                emqx_rule_failed_exception => ?MG('failed.exception', Counters),
-                emqx_rule_failed_no_result => ?MG('failed.no_result', Counters),
-                emqx_rule_actions_total => ?MG('actions.total', Counters),
-                emqx_rule_actions_success => ?MG('actions.success', Counters),
-                emqx_rule_actions_failed => ?MG('actions.failed', Counters),
-                emqx_rule_actions_failed_out_of_service => ?MG(
-                    'actions.failed.out_of_service', Counters
-                ),
-                emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters)
-            }
-    end.
+    #{counters := Counters} =
+        emqx_metrics_worker:get_metrics(rule_metrics, Id),
+    #{
+        emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool),
+        emqx_rule_matched => ?MG(matched, Counters),
+        emqx_rule_failed => ?MG(failed, Counters),
+        emqx_rule_passed => ?MG(passed, Counters),
+        emqx_rule_failed_exception => ?MG('failed.exception', Counters),
+        emqx_rule_failed_no_result => ?MG('failed.no_result', Counters),
+        emqx_rule_actions_total => ?MG('actions.total', Counters),
+        emqx_rule_actions_success => ?MG('actions.success', Counters),
+        emqx_rule_actions_failed => ?MG('actions.failed', Counters),
+        emqx_rule_actions_failed_out_of_service => ?MG('actions.failed.out_of_service', Counters),
+        emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters)
+    }.
 
 %%====================
 %% Action Metric
@@ -546,6 +584,7 @@ get_connector_status(ResourceData) ->
 %% merge / zip formatting funcs for type `application/json`
 collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
     RulesD = rules_ov_data(Rules),
+    ActionsD = actions_ov_data(Rules),
     ConnectorsD = connectors_ov_data(BridgesV1, Connectors),
 
     M1 = lists:foldl(
@@ -554,13 +593,18 @@ collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
         rules_ov_metric(names)
     ),
     M2 = lists:foldl(
+        fun(K, AccIn) -> AccIn#{K => ?MG(K, ActionsD)} end,
+        #{},
+        actions_ov_metric(names)
+    ),
+    M3 = lists:foldl(
         fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
         #{},
         connectors_ov_metric(names)
     ),
-    M3 = maybe_collect_schema_registry(),
+    M4 = maybe_collect_schema_registry(),
 
-    lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
+    lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3, M4]).
 
 collect_json_data(Data) ->
     emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_data_integration_metrics/3).

+ 4 - 1
apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl

@@ -349,6 +349,7 @@ metric_meta(<<"emqx_authz_", _Tail/binary>>) -> ?meta(1, 1, 2);
 metric_meta(<<"emqx_banned_count">>) -> ?meta(0, 0, 0);
 %% `/prometheus/data_integration`
 metric_meta(<<"emqx_rules_count">>) -> ?meta(0, 0, 0);
+metric_meta(<<"emqx_actions_count">>) -> ?meta(0, 0, 0);
 metric_meta(<<"emqx_connectors_count">>) -> ?meta(0, 0, 0);
 metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0);
 metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2);
@@ -808,6 +809,7 @@ assert_json_data__data_integration_overview(M, _) ->
         #{
             emqx_connectors_count := _,
             emqx_rules_count := _,
+            emqx_actions_count := _,
             emqx_schema_registrys_count := _
         },
         M
@@ -818,7 +820,8 @@ assert_json_data__data_integration_overview(M, _) ->
     ?assertMatch(
         #{
             emqx_connectors_count := _,
-            emqx_rules_count := _
+            emqx_rules_count := _,
+            emqx_actions_count := _
         },
         M
     ).

+ 2 - 2
apps/emqx_utils/src/emqx_utils_scram.erl

@@ -47,7 +47,7 @@ check_client_first_message(
     Bin, _Cache, #{iteration_count := IterationCount}, RetrieveFun, OnErrFun
 ) ->
     case
-        esasl_scram:check_client_first_message(
+        sasl_auth_scram:check_client_first_message(
             Bin,
             #{
                 iteration_count => IterationCount,
@@ -66,7 +66,7 @@ check_client_first_message(
 
 check_client_final_message(Bin, Cache, #{algorithm := Alg}, OnErrFun, ResultKeys) ->
     case
-        esasl_scram:check_client_final_message(
+        sasl_auth_scram:check_client_final_message(
             Bin,
             Cache#{algorithm => Alg}
         )

+ 69 - 47
bin/emqx

@@ -29,6 +29,14 @@ logwarn() {
     fi
 }
 
+loginfo() {
+    if [ "${TERM:-dumb}" = dumb ]; then
+        echo "INFO: $*"
+    else
+        echo "$(tput setaf 2)INFO: $*$(tput sgr0)"
+    fi
+}
+
 logdebug() {
     if [ "$DEBUG" -eq 1 ]; then
         echo "DEBUG: $*"
@@ -63,15 +71,55 @@ else
     RUNNER_ROOT_DIR="$(cd "$(dirname "$(realpath "$0" || echo "$0")")"/..; pwd -P)"
 fi
 
+COMMAND="${1:-}"
+GREP='grep --color=never'
+
+if [ -z "$COMMAND" ]; then
+    usage 'help'
+    exit 1
+elif [ "$COMMAND" = 'help' ]; then
+    usage 'help'
+    exit 0
+fi
+
+if [ "${2:-}" = 'help' ]; then
+    ## 'ctl' command has its own usage info
+    if [ "$COMMAND" != 'ctl' ]; then
+        usage "$COMMAND"
+        exit 0
+    fi
+fi
+
+## IS_BOOT_COMMAND is set for later to inspect node name and cookie from hocon config (or env variable)
+case "${COMMAND}" in
+    start|console|console_clean|foreground|check_config)
+        IS_BOOT_COMMAND='yes'
+        ;;
+    ertspath)
+        echo "$ERTS_DIR"
+        exit 0
+        ;;
+    root_dir)
+        echo "$RUNNER_ROOT_DIR"
+        exit 0
+        ;;
+    *)
+        IS_BOOT_COMMAND='no'
+        ;;
+esac
+
+RELUP_DIR="relup"
 BASE_RUNNER_ROOT_DIR="${BASE_RUNNER_ROOT_DIR:-$RUNNER_ROOT_DIR}"
+RELUP_PATH="$RUNNER_ROOT_DIR/$RELUP_DIR"
 
-if [ -f "$RUNNER_ROOT_DIR/relup/version" ]; then
-    TARGET_VSN=$(cat "$RUNNER_ROOT_DIR/relup/version")
+if [ -f "$RELUP_PATH/version" ]; then
+    TARGET_VSN=$(cat "$RELUP_PATH/version")
     export BASE_RUNNER_ROOT_DIR
-    logwarn "Loading emqx from hot upgrade dir: $RUNNER_ROOT_DIR/relup"
-    exec "$RUNNER_ROOT_DIR"/relup/"$TARGET_VSN"/bin/emqx "$@"
-else
-    logdebug "Loading emqx from $RUNNER_ROOT_DIR"
+    ## only print for boot commands to avoid messing the CLI outputs
+    if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
+        loginfo "Loading emqx from hot-upgrade dir: $RELUP_PATH"
+    fi
+    exec "$RELUP_PATH/$TARGET_VSN"/bin/emqx "$@"
 fi
 
 # shellcheck disable=SC1090,SC1091
@@ -246,42 +294,6 @@ usage() {
     esac
 }
 
-COMMAND="${1:-}"
-GREP='grep --color=never'
-
-if [ -z "$COMMAND" ]; then
-    usage 'help'
-    exit 1
-elif [ "$COMMAND" = 'help' ]; then
-    usage 'help'
-    exit 0
-fi
-
-if [ "${2:-}" = 'help' ]; then
-    ## 'ctl' command has its own usage info
-    if [ "$COMMAND" != 'ctl' ]; then
-        usage "$COMMAND"
-        exit 0
-    fi
-fi
-
-## IS_BOOT_COMMAND is set for later to inspect node name and cookie from hocon config (or env variable)
-case "${COMMAND}" in
-    start|console|console_clean|foreground|check_config)
-        IS_BOOT_COMMAND='yes'
-        ;;
-    ertspath)
-        echo "$ERTS_DIR"
-        exit 0
-        ;;
-    root_dir)
-        echo "$RUNNER_ROOT_DIR"
-        exit 0
-        ;;
-    *)
-        IS_BOOT_COMMAND='no'
-        ;;
-esac
 
 ## backward compatible
 if [ -d "$ERTS_DIR/lib" ]; then
@@ -464,17 +476,25 @@ call_hocon() {
         || die "call_hocon_failed: $*" $?
 }
 
-find_emqx_process() {
+check_emqx_process() {
+    local rootdir="$1"
     ## Find the running node from 'ps -ef'
     ##  * The grep args like '[e]mqx' but not 'emqx' is to avoid greping the grep command itself
     ##  * The running 'remsh' and 'nodetool' processes must be excluded
+    ps -ef | $GREP '[e]mqx' | $GREP -v -E '(remsh|nodetool)' | $GREP -oE "\-[r]oot ${rootdir}.*" || true
+}
+
+find_emqx_process() {
+    ## Maybe the emqx has been hot upgraded and is still running from the base root_dir.
+    ## So instead of searching RUNNER_ROOT_DIR, we only search for processes running
+    ## from BASE_RUNNER_ROOT_DIR (which is either equal to RUNNER_ROOT_DIR or a
+    ## parent directory of it).
+    local rootdir="${BASE_RUNNER_ROOT_DIR}"
     if [ -n "${EMQX_NODE__NAME:-}" ]; then
         # if node name is provided, filter by node name
-        # shellcheck disable=SC2009
-        ps -ef | $GREP '[e]mqx' | $GREP -v -E '(remsh|nodetool)' | $GREP -E "\s-s?name\s${EMQX_NODE__NAME}" | $GREP -oE "\-[r]oot ${RUNNER_ROOT_DIR}.*" || true
+        check_emqx_process "${rootdir}" | $GREP -E "\s-s?name\s${EMQX_NODE__NAME}" || true
     else
-        # shellcheck disable=SC2009
-        ps -ef | $GREP '[e]mqx' | $GREP -v -E '(remsh|nodetool)' | $GREP -oE "\-[r]oot ${RUNNER_ROOT_DIR}.*" || true
+        check_emqx_process "${rootdir}"
     fi
 }
 
@@ -1071,6 +1091,8 @@ nodetool_shutdown() {
     logger -t "${REL_NAME}[${PID}]" "STOP: OK"
 }
 
+## make sure the CWD of emqx is BASE_RUNNER_ROOT_DIR, so relative paths like "etc/"
+## and "data/" still work.
 cd "$BASE_RUNNER_ROOT_DIR"
 
 case "${COMMAND}" in

+ 2 - 0
changes/ce/feat-13665.en.md

@@ -0,0 +1,2 @@
+Add a metric `emqx_actions_count` to the prometheus endpoint.
+To count the number of all actions added by all rules, including Republish actions and Console Output actions.

+ 1 - 5
mix.exs

@@ -131,7 +131,6 @@ defmodule EMQXUmbrella.MixProject do
       common_dep(:snabbkaffe),
       common_dep(:hocon),
       common_dep(:emqx_http_lib),
-      common_dep(:esasl),
       common_dep(:jose),
       # in conflict by ehttpc and emqtt
       common_dep(:gun),
@@ -212,7 +211,7 @@ defmodule EMQXUmbrella.MixProject do
 
   # in conflict by emqx_connector and system_monitor
   def common_dep(:epgsql), do: {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true}
-  def common_dep(:esasl), do: {:esasl, github: "emqx/esasl", tag: "0.2.1"}
+  def common_dep(:sasl_auth), do: {:sasl_auth, "2.3.0", override: true}
   def common_dep(:gen_rpc), do: {:gen_rpc, github: "emqx/gen_rpc", tag: "3.4.0", override: true}
 
   def common_dep(:system_monitor),
@@ -270,9 +269,6 @@ defmodule EMQXUmbrella.MixProject do
       system_env: emqx_app_system_env()
     }
 
-  def common_dep(:sasl_auth),
-    do: {:sasl_auth, github: "kafka4beam/sasl_auth", tag: "v2.2.0", override: true}
-
   ###############################################################################################
   # BEGIN DEPRECATED FOR MIX BLOCK
   # These should be removed once we fully migrate to mix

+ 1 - 1
rebar.config

@@ -99,7 +99,7 @@
     {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.3"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
-    {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}},
+    {sasl_auth, "2.3.0"},
     {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}},
     {telemetry, "1.1.0"},
     {hackney, {git, "https://github.com/emqx/hackney.git", {tag, "1.18.1-1"}}},