Просмотр исходного кода

Merge pull request #13406 from thalesmg/20240703-m-sync-r57

sync release-57 to master
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
e9265b88e5

+ 13 - 8
apps/emqx/mix.exs

@@ -28,7 +28,7 @@ defmodule EMQX.MixProject do
   def application do
     [
       ## FIXME!!! go though emqx.app.src and add missing stuff...
-      extra_applications: [:public_key, :ssl, :os_mon, :logger, :mnesia] ++ UMP.extra_applications(),
+      extra_applications: [:public_key, :ssl, :os_mon, :logger, :mnesia, :sasl] ++ UMP.extra_applications(),
       mod: {:emqx_app, []}
     ]
   end
@@ -37,14 +37,19 @@ defmodule EMQX.MixProject do
     ## FIXME!!! go though emqx.app.src and add missing stuff...
     [
       {:emqx_utils, in_umbrella: true},
-      {:emqx_ds_backends, in_umbrella: true},
+      # {:emqx_ds_backends, in_umbrella: true},
 
-      {:ekka, github: "emqx/ekka", tag: "0.19.3", override: true},
-      {:esockd, github: "emqx/esockd", tag: "5.11.2"},
-      {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
-      {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true},
-      {:lc, github: "emqx/lc", tag: "0.3.2", override: true},
-      {:ranch, github: "emqx/ranch", tag: "1.8.1-emqx", override: true},
+      UMP.common_dep(:gproc),
+      UMP.common_dep(:gen_rpc),
+      UMP.common_dep(:ekka),
+      UMP.common_dep(:esockd),
+      UMP.common_dep(:cowboy),
+      UMP.common_dep(:lc),
+      UMP.common_dep(:hocon),
+      UMP.common_dep(:ranch),
+      UMP.common_dep(:bcrypt),
+      UMP.common_dep(:pbkdf2),
+      UMP.common_dep(:emqx_http_lib),
     ] ++ UMP.quicer_dep()
   end
 

+ 1 - 1
apps/emqx_auth_http/mix.exs

@@ -29,7 +29,7 @@ defmodule EMQXAuthHTTP.MixProject do
       {:emqx_auth, in_umbrella: true},
       {:emqx_resource, in_umbrella: true},
       {:emqx_connector, in_umbrella: true},
-      {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true}
+      UMP.common_dep(:hocon)
     ]
   end
 end

+ 1 - 1
apps/emqx_auth_jwt/mix.exs

@@ -28,7 +28,7 @@ defmodule EMQXAuthJWT.MixProject do
       {:emqx, in_umbrella: true},
       {:emqx_auth, in_umbrella: true},
       {:emqx_resource, in_umbrella: true},
-      {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}
+      UMP.common_dep(:jose),
     ]
   end
 end

+ 5 - 1
apps/emqx_auth_mnesia/mix.exs

@@ -24,6 +24,10 @@ defmodule EMQXAuthMnesia.MixProject do
   end
 
   def deps() do
-    [{:emqx, in_umbrella: true}, {:emqx_auth, in_umbrella: true}]
+    [
+      {:emqx, in_umbrella: true},
+      {:emqx_auth, in_umbrella: true},
+      UMP.common_dep(:esasl),
+    ]
   end
 end

+ 1 - 1
apps/emqx_bridge_http/mix.exs

@@ -27,7 +27,7 @@ defmodule EMQXBridgeHTTP.MixProject do
     [
       {:emqx, in_umbrella: true},
       {:emqx_resource, in_umbrella: true},
-      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.13"}
+      UMP.common_dep(:ehttpc),
     ]
   end
 end

+ 1 - 1
apps/emqx_bridge_mqtt/mix.exs

@@ -27,7 +27,7 @@ defmodule EMQXBridgeMQTT.MixProject do
     [
       {:emqx, in_umbrella: true},
       {:emqx_resource, in_umbrella: true},
-      {:emqtt, github: "emqx/emqtt", tag: "1.10.1", system_env: UMP.maybe_no_quic_env()}
+      UMP.common_dep(:emqtt),
     ]
   end
 end

+ 4 - 4
apps/emqx_connector/mix.exs

@@ -33,11 +33,11 @@ defmodule EMQXConnector.MixProject do
     [
       {:emqx, in_umbrella: true},
       {:emqx_resource, in_umbrella: true},
-      {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
-      {:ecpool, github: "emqx/ecpool", tag: "0.5.7"},
+      UMP.common_dep(:jose),
+      UMP.common_dep(:ecpool),
       {:eredis_cluster, github: "emqx/eredis_cluster", tag: "0.8.4"},
-      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.13"},
-      {:emqtt, github: "emqx/emqtt", tag: "1.10.1", system_env: UMP.maybe_no_quic_env()}
+      UMP.common_dep(:ehttpc),
+      UMP.common_dep(:emqtt),
     ]
   end
 end

+ 2 - 2
apps/emqx_durable_storage/mix.exs

@@ -29,8 +29,8 @@ defmodule EMQXDurableStorage.MixProject do
   def deps() do
     [
       {:emqx_utils, in_umbrella: true},
-      {:gproc, github: "emqx/gproc", tag: "0.9.0.1"},
-      {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5"},
+      UMP.common_dep(:rocksdb),
+      UMP.common_dep(:gproc),
       {:ra, "2.7.3"},
     ]
   end

+ 3 - 3
apps/emqx_enterprise/mix.exs

@@ -23,9 +23,9 @@ defmodule EMQXEnterprise.MixProject do
 
   def deps() do
     [
-      {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.10"},
-      {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1"},
-      {:hocon, github: "emqx/hocon", tag: "0.42.2"}
+      UMP.common_dep(:snabbkaffe),
+      UMP.common_dep(:typerefl),
+      UMP.common_dep(:hocon),
     ]
   end
 end

+ 1 - 1
apps/emqx_exhook/mix.exs

@@ -38,7 +38,7 @@ defmodule EMQXExhook.MixProject do
     [
       {:emqx, in_umbrella: true},
       {:emqx_utils, in_umbrella: true},
-      {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}
+      UMP.common_dep(:grpc)
     ]
   end
 

+ 1 - 1
apps/emqx_gateway_exproto/mix.exs

@@ -37,7 +37,7 @@ defmodule EMQXGatewayExproto.MixProject do
       {:emqx, in_umbrella: true},
       {:emqx_utils, in_umbrella: true},
       {:emqx_gateway, in_umbrella: true},
-      {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}
+      UMP.common_dep(:grpc)
     ]
   end
 end

+ 1 - 1
apps/emqx_gcp_device/mix.exs

@@ -29,7 +29,7 @@ defmodule EMQXGCPDevice.MixProject do
     [
       {:emqx, in_umbrella: true},
       {:emqx_auth, in_umbrella: true},
-      {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}
+      UMP.common_dep(:jose),
     ]
   end
 

+ 1 - 1
apps/emqx_machine/mix.exs

@@ -29,7 +29,7 @@ defmodule EMQXMachine.MixProject do
       {:emqx_conf, in_umbrella: true, runtime: false},
       {:emqx_dashboard, in_umbrella: true, runtime: false},
       {:emqx_management, in_umbrella: true, runtime: false},
-      {:covertool, github: "zmstone/covertool", tag: "2.0.4.1"}
+      UMP.common_dep(:covertool),
     ]
   end
 end

+ 1 - 0
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -66,6 +66,7 @@ stop_apps() ->
     ?SLOG(notice, #{msg => "stopping_emqx_apps"}),
     _ = emqx_alarm_handler:unload(),
     ok = emqx_conf_app:unset_config_loaded(),
+    ok = emqx_plugins:ensure_stopped(),
     lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
 
 %% Those port apps are terminated after the main apps

+ 7 - 1
apps/emqx_management/src/emqx_mgmt_auth.erl

@@ -357,6 +357,11 @@ init_bootstrap_file(File) ->
             init_bootstrap_file(File, Dev, MP);
         {error, Reason0} ->
             Reason = emqx_utils:explain_posix(Reason0),
+            FmtReason = emqx_utils:format(
+                "load API bootstrap file failed, file:~ts, reason:~ts",
+                [File, Reason]
+            ),
+
             ?SLOG(
                 error,
                 #{
@@ -365,7 +370,8 @@ init_bootstrap_file(File) ->
                     reason => Reason
                 }
             ),
-            {error, Reason}
+
+            {error, FmtReason}
     end.
 
 init_bootstrap_file(File, Dev, MP) ->

+ 31 - 14
apps/emqx_management/test/emqx_mgmt_api_cluster_SUITE.erl

@@ -78,19 +78,26 @@ t_cluster_topology_api_replicants(Config) ->
             [
                 #{
                     core_node := Core1,
-                    replicant_nodes :=
-                        [#{node := Replicant, streams := _}]
+                    replicant_nodes := _
                 },
                 #{
                     core_node := Core2,
-                    replicant_nodes :=
-                        [#{node := Replicant, streams := _}]
+                    replicant_nodes := _
                 }
             ],
             Resp
         )
      || Resp <- [lists:sort(R) || R <- [Core1Resp, Core2Resp, ReplResp]]
-    ].
+    ],
+    %% Occasionally, the replicant may decide to not connect to one core (seen at tests)...
+    Core1RespReplicants = lists:usort([
+        Rep
+     || R <- [Core1Resp, Core2Resp, ReplResp],
+        #{replicant_nodes := Reps} <- R,
+        #{node := Rep} <- Reps
+    ]),
+    ?assertMatch([Replicant], Core1RespReplicants),
+    ok.
 
 t_cluster_invite_api_timeout(Config) ->
     %% assert the cluster is created
@@ -100,17 +107,22 @@ t_cluster_invite_api_timeout(Config) ->
         [
             #{
                 core_node := Core1,
-                replicant_nodes :=
-                    [#{node := Replicant, streams := _}]
+                replicant_nodes := _
             },
             #{
                 core_node := Core2,
-                replicant_nodes :=
-                    [#{node := Replicant, streams := _}]
+                replicant_nodes := _
             }
         ],
         lists:sort(Core1Resp)
     ),
+    %% Occasionally, the replicant may decide to connect to one core (seen at tests)...
+    Core1RespReplicants = lists:usort([
+        Rep
+     || #{replicant_nodes := Reps} <- Core1Resp,
+        #{node := Rep} <- Reps
+    ]),
+    ?assertMatch([Replicant], Core1RespReplicants),
 
     %% force leave the core2
     {204} = rpc:call(
@@ -181,17 +193,22 @@ t_cluster_invite_async(Config) ->
         [
             #{
                 core_node := Core1,
-                replicant_nodes :=
-                    [#{node := Replicant, streams := _}]
+                replicant_nodes := _
             },
             #{
                 core_node := Core2,
-                replicant_nodes :=
-                    [#{node := Replicant, streams := _}]
+                replicant_nodes := _
             }
         ],
         lists:sort(Core1Resp)
     ),
+    %% Occasionally, the replicant may decide to connect to one core (seen at tests)...
+    Core1RespReplicants = lists:usort([
+        Rep
+     || #{replicant_nodes := Reps} <- Core1Resp,
+        #{node := Rep} <- Reps
+    ]),
+    ?assertMatch([Replicant], Core1RespReplicants),
 
     %% force leave the core2
     {204} = rpc:call(
@@ -206,7 +223,7 @@ t_cluster_invite_async(Config) ->
         [
             #{
                 core_node := Core1,
-                replicant_nodes := [_]
+                replicant_nodes := _
             }
         ],
         lists:sort(Core1Resp2)

+ 30 - 6
apps/emqx_message_transformation/src/emqx_message_transformation.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_message_transformation).
 
+-feature(maybe_expr, enable).
+
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
@@ -54,11 +56,18 @@
 
 -type eval_context() :: #{
     client_attrs := map(),
+    clientid := _,
+    flags := _,
+    id := _,
+    node := _,
     payload := _,
+    peername := _,
+    publish_received_at := _,
     qos := _,
     retain := _,
     topic := _,
     user_property := _,
+    username := _,
     dirty := #{
         payload => true,
         qos => true,
@@ -323,20 +332,35 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
             true -> #{};
             false -> #{payload => true}
         end,
-    UserProperties0 = maps:get(
-        'User-Property',
-        emqx_message:get_header(properties, Message, #{}),
-        []
-    ),
+    Flags = emqx_message:get_flags(Message),
+    Props = emqx_message:get_header(properties, Message, #{}),
+    UserProperties0 = maps:get('User-Property', Props, []),
     UserProperties = maps:from_list(UserProperties0),
+    Headers = Message#message.headers,
+    Peername =
+        case maps:get(peername, Headers, undefined) of
+            Peername0 when is_tuple(Peername0) ->
+                iolist_to_binary(emqx_utils:ntoa(Peername0));
+            _ ->
+                undefined
+        end,
+    Username = maps:get(username, Headers, undefined),
     #{
         dirty => Dirty,
+
         client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
+        clientid => Message#message.from,
+        flags => Flags,
+        id => emqx_guid:to_hexstr(Message#message.id),
+        node => node(),
         payload => Payload,
+        peername => Peername,
+        publish_received_at => Message#message.timestamp,
         qos => Message#message.qos,
         retain => emqx_message:get_flag(retain, Message, false),
         topic => Message#message.topic,
-        user_property => UserProperties
+        user_property => UserProperties,
+        username => Username
     }.
 
 -spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->

+ 79 - 7
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -285,14 +285,17 @@ connect(ClientId, IsPersistent) ->
     connect(ClientId, IsPersistent, _Opts = #{}).
 
 connect(ClientId, IsPersistent, Opts) ->
+    StartProps = maps:get(start_props, Opts, #{}),
     Properties0 = maps:get(properties, Opts, #{}),
     Properties = emqx_utils_maps:put_if(Properties0, 'Session-Expiry-Interval', 30, IsPersistent),
-    {ok, Client} = emqtt:start_link([
-        {clean_start, true},
-        {clientid, ClientId},
-        {properties, Properties},
-        {proto_ver, v5}
-    ]),
+    Defaults = #{
+        clean_start => true,
+        clientid => ClientId,
+        properties => Properties,
+        proto_ver => v5
+    },
+    Props = emqx_utils_maps:deep_merge(Defaults, StartProps),
+    {ok, Client} = emqtt:start_link(Props),
     {ok, _} = emqtt:connect(Client),
     on_exit(fun() -> catch emqtt:stop(Client) end),
     Client.
@@ -496,11 +499,21 @@ assert_monitor_metrics() ->
     ),
     ok.
 
+-define(assertReceiveReturn(PATTERN, TIMEOUT),
+    (fun() ->
+        receive
+            PATTERN = ____Msg0 -> ____Msg0
+        after TIMEOUT ->
+            error({message_not_received, ?LINE})
+        end
+    end)()
+).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
-%% Smoke test where we have a single check and `all_pass' strategy.
+%% Smoke test where we have an example transfomration.
 t_smoke_test(_Config) ->
     Name1 = <<"foo">>,
     Operations = [
@@ -588,6 +601,65 @@ t_smoke_test(_Config) ->
 
     ok.
 
+%% A smoke test for a subset of read-only context fields.
+%%   * clientid
+%%   * id
+%%   * node
+%%   * peername
+%%   * publish_received_at
+%%   * username
+t_smoke_test_2(_Config) ->
+    Name1 = <<"foo">>,
+    Operations = [
+        operation(<<"payload.clientid">>, <<"clientid">>),
+        operation(<<"payload.id">>, <<"id">>),
+        operation(<<"payload.node">>, <<"node">>),
+        operation(<<"payload.peername">>, <<"peername">>),
+        operation(<<"payload.publish_received_at">>, <<"publish_received_at">>),
+        operation(<<"payload.username">>, <<"username">>),
+        operation(<<"payload.flags">>, <<"flags">>)
+    ],
+    Transformation1 = transformation(Name1, Operations),
+    {201, _} = insert(Transformation1),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    C1 = connect(ClientId),
+    {ok, _, [_]} = emqtt:subscribe(C1, <<"t/#">>, [{qos, 2}]),
+    ok = publish(C1, <<"t/1">>, #{}),
+    {publish, #{payload := Payload0}} = ?assertReceiveReturn({publish, _}, 1_000),
+    NodeBin = atom_to_binary(node()),
+    ?assertMatch(
+        #{
+            <<"clientid">> := ClientId,
+            <<"id">> := <<_/binary>>,
+            <<"node">> := NodeBin,
+            <<"peername">> := <<"127.0.0.1:", _/binary>>,
+            <<"publish_received_at">> := PRAt,
+            <<"username">> := <<"undefined">>,
+            <<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
+        } when is_integer(PRAt),
+        emqx_utils_json:decode(Payload0, [return_maps])
+    ),
+    %% Reconnect with an username.
+    emqtt:stop(C1),
+    Username = <<"myusername">>,
+    C2 = connect(ClientId, _IsPersistent = false, #{start_props => #{username => Username}}),
+    {ok, _, [_]} = emqtt:subscribe(C2, <<"t/#">>, [{qos, 2}]),
+    ok = publish(C2, <<"t/1">>, #{}),
+    {publish, #{payload := Payload1}} = ?assertReceiveReturn({publish, _}, 1_000),
+    ?assertMatch(
+        #{
+            <<"clientid">> := ClientId,
+            <<"id">> := <<_/binary>>,
+            <<"node">> := NodeBin,
+            <<"peername">> := <<"127.0.0.1:", _/binary>>,
+            <<"publish_received_at">> := PRAt,
+            <<"username">> := Username,
+            <<"flags">> := #{<<"dup">> := false, <<"retain">> := false}
+        } when is_integer(PRAt),
+        emqx_utils_json:decode(Payload1, [return_maps])
+    ),
+    ok.
+
 t_crud(_Config) ->
     ?assertMatch({200, []}, list()),
 

+ 1 - 1
apps/emqx_plugins/src/emqx_plugins.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugins, [
     {description, "EMQX Plugin Management"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {modules, []},
     {mod, {emqx_plugins_app, []}},
     {applications, [kernel, stdlib, emqx, erlavro]},

+ 7 - 5
apps/emqx_plugins/src/emqx_plugins.erl

@@ -299,8 +299,10 @@ ensure_stopped() ->
     Fun = fun
         (#{name_vsn := NameVsn, enable := true}) ->
             case ensure_stopped(NameVsn) of
-                ok -> [];
-                {error, Reason} -> [{NameVsn, Reason}]
+                ok ->
+                    [];
+                {error, Reason} ->
+                    [{NameVsn, Reason}]
             end;
         (#{name_vsn := NameVsn, enable := false}) ->
             ?SLOG(debug, #{msg => "plugin_disabled", action => stop_plugin, name_vsn => NameVsn}),
@@ -1077,15 +1079,15 @@ stop_app(App) ->
     case application:stop(App) of
         ok ->
             ?SLOG(debug, #{msg => "stop_plugin_successfully", app => App}),
-            ok = unload_moudle_and_app(App);
+            ok = unload_module_and_app(App);
         {error, {not_started, App}} ->
             ?SLOG(debug, #{msg => "plugin_not_started", app => App}),
-            ok = unload_moudle_and_app(App);
+            ok = unload_module_and_app(App);
         {error, Reason} ->
             throw(#{msg => "failed_to_stop_app", app => App, reason => Reason})
     end.
 
-unload_moudle_and_app(App) ->
+unload_module_and_app(App) ->
     case application:get_key(App, modules) of
         {ok, Modules} ->
             lists:foreach(fun code:soft_purge/1, Modules);

+ 2 - 0
apps/emqx_plugins/src/emqx_plugins_app.erl

@@ -19,6 +19,7 @@
 -behaviour(application).
 
 -include("emqx_plugins.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 -export([
     start/2,
@@ -31,6 +32,7 @@ start(_Type, _Args) ->
     ok = emqx_plugins:ensure_installed(),
     ok = emqx_plugins:ensure_started(),
     ok = emqx_config_handler:add_handler([?CONF_ROOT], emqx_plugins),
+    ?tp("emqx_plugins_app_started", #{}),
     {ok, Sup}.
 
 stop(_State) ->

+ 150 - 0
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -48,6 +48,8 @@
 -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG, "0.1.0-2").
 -define(PACKAGE_SUFFIX, ".tar.gz").
 
+-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
+
 all() ->
     [
         {group, copy_plugin},
@@ -140,6 +142,39 @@ bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8);
 bin(B) when is_binary(B) -> B.
 
+hookpoints() ->
+    [
+        'client.connect',
+        'client.connack',
+        'client.connected',
+        'client.disconnected',
+        'client.authenticate',
+        'client.authorize',
+        'client.subscribe',
+        'client.unsubscribe',
+        'session.created',
+        'session.subscribed',
+        'session.unsubscribed',
+        'session.resumed',
+        'session.discarded',
+        'session.takenover',
+        'session.terminated',
+        'message.publish',
+        'message.puback',
+        'message.delivered',
+        'message.acked',
+        'message.dropped'
+    ].
+
+get_hook_modules() ->
+    lists:flatmap(
+        fun(HookPoint) ->
+            CBs = emqx_hooks:lookup(HookPoint),
+            [Mod || {callback, {Mod, _Fn, _Args}, _Filter, _Prio} <- CBs]
+        end,
+        hookpoints()
+    ).
+
 t_demo_install_start_stop_uninstall({init, Config}) ->
     Opts = #{package := Package} = get_demo_plugin_package(),
     NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
@@ -256,9 +291,18 @@ t_start_restart_and_stop({init, Config}) ->
 t_start_restart_and_stop({'end', _Config}) ->
     ok;
 t_start_restart_and_stop(Config) ->
+    %% pre-condition
+    Hooks0 = get_hook_modules(),
+    ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks0), #{hooks => Hooks0}),
+
     NameVsn = proplists:get_value(name_vsn, Config),
     ok = emqx_plugins:ensure_installed(NameVsn),
     ok = emqx_plugins:ensure_enabled(NameVsn),
+
+    %% Application is not yet started.
+    Hooks1 = get_hook_modules(),
+    ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks1), #{hooks => Hooks1}),
+
     FakeInfo =
         "name=bar, rel_vsn=\"2\", rel_apps=[\"bar-9\"],"
         "description=\"desc bar\"",
@@ -271,6 +315,10 @@ t_start_restart_and_stop(Config) ->
     ok = emqx_plugins:ensure_started(),
     assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
 
+    %% Should have called the application start callback, which in turn adds hooks.
+    Hooks2 = get_hook_modules(),
+    ?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks2), #{hooks => Hooks2}),
+
     %% fake enable bar-2
     ok = ensure_state(Bar2, rear, true),
     %% should cause an error
@@ -292,6 +340,10 @@ t_start_restart_and_stop(Config) ->
     assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
     ok = ensure_state(Bar2, rear, false),
 
+    %% Should have called the application stop callback, which removes the hooks.
+    Hooks3 = get_hook_modules(),
+    ?assertNot(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks3), #{hooks => Hooks3}),
+
     ok = emqx_plugins:restart(NameVsn),
     assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
     %% repeat
@@ -371,6 +423,15 @@ assert_app_running(Name, false) ->
     AllApps = application:which_applications(),
     ?assertEqual(false, lists:keyfind(Name, 1, AllApps)).
 
+assert_started_and_hooks_loaded() ->
+    PluginConfig = emqx_plugins:list(),
+    ct:pal("plugin config:\n  ~p", [PluginConfig]),
+    ?assertMatch([_], PluginConfig),
+    assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
+    Hooks = get_hook_modules(),
+    ?assert(lists:member(?EMQX_PLUGIN_APP_NAME, Hooks), #{hooks => Hooks}),
+    ok.
+
 t_bad_tar_gz({init, Config}) ->
     Config;
 t_bad_tar_gz({'end', _Config}) ->
@@ -841,6 +902,95 @@ group_t_cluster_leave(Config) ->
     ),
     ok.
 
+%% Checks that starting a node with a plugin enabled starts it correctly, and that the
+%% hooks added by the plugin's `application:start/2' callback are indeed in place.
+%% See also: https://github.com/emqx/emqx/issues/13378
+t_start_node_with_plugin_enabled({init, Config}) ->
+    #{package := Package, shdir := InstallDir} = get_demo_plugin_package(),
+    NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
+    AppSpecs = [
+        emqx,
+        emqx_conf,
+        emqx_ctl,
+        {emqx_plugins, #{
+            config =>
+                #{
+                    plugins =>
+                        #{
+                            install_dir => InstallDir,
+                            states =>
+                                [
+                                    #{
+                                        enable => true,
+                                        name_vsn => NameVsn
+                                    }
+                                ]
+                        }
+                }
+        }}
+    ],
+    Name1 = t_cluster_start_enabled1,
+    Name2 = t_cluster_start_enabled2,
+    Specs = emqx_cth_cluster:mk_nodespecs(
+        [
+            {Name1, #{role => core, apps => AppSpecs, join_to => undefined}},
+            {Name2, #{role => core, apps => AppSpecs, join_to => undefined}}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
+    ),
+    Names = [Name1, Name2],
+    Nodes = [emqx_cth_cluster:node_name(N) || N <- Names],
+    [
+        {node_specs, Specs},
+        {nodes, Nodes},
+        {name_vsn, NameVsn}
+        | Config
+    ];
+t_start_node_with_plugin_enabled({'end', Config}) ->
+    Nodes = ?config(nodes, Config),
+    ok = emqx_cth_cluster:stop(Nodes),
+    ok;
+t_start_node_with_plugin_enabled(Config) when is_list(Config) ->
+    NodeSpecs = ?config(node_specs, Config),
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            [N1, N2 | _] = emqx_cth_cluster:start(NodeSpecs),
+            ?ON(N1, assert_started_and_hooks_loaded()),
+            ?ON(N2, assert_started_and_hooks_loaded()),
+            %% Now make them join.
+            %% N.B.: We need to start autocluster so that applications are restarted in
+            %% order, and also we need to override the config loader to emulate what
+            %% `emqx_cth_cluster' does and avoid the node crashing due to lack of config
+            %% keys.
+            ok = ?ON(N2, emqx_machine_boot:start_autocluster()),
+            ?ON(N2, begin
+                StartCallback0 =
+                    case ekka:env({callback, start}) of
+                        {ok, SC0} -> SC0;
+                        _ -> fun() -> ok end
+                    end,
+                StartCallback = fun() ->
+                    ok = emqx_app:set_config_loader(emqx_cth_suite),
+                    StartCallback0()
+                end,
+                ekka:callback(start, StartCallback)
+            end),
+            {ok, {ok, _}} =
+                ?wait_async_action(
+                    ?ON(N2, ekka:join(N1)),
+                    #{?snk_kind := "emqx_plugins_app_started"}
+                ),
+            ct:pal("checking N1 state"),
+            ?ON(N1, assert_started_and_hooks_loaded()),
+            ct:pal("checking N2 state"),
+            ?ON(N2, assert_started_and_hooks_loaded()),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 make_tar(Cwd, NameWithVsn) ->
     make_tar(Cwd, NameWithVsn, NameWithVsn).
 

+ 1 - 1
apps/emqx_postgresql/mix.exs

@@ -23,7 +23,7 @@ defmodule EMQXPostgresql.MixProject do
 
   def deps() do
     [
-      {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2"},
+      UMP.common_dep(:epgsql),
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true}
     ]

+ 4 - 4
apps/emqx_resource/mix.exs

@@ -26,10 +26,10 @@ defmodule EMQXResource.MixProject do
   def deps() do
     [
       {:emqx, in_umbrella: true},
-      {:ecpool, github: "emqx/ecpool", tag: "0.5.7"},
-      {:gproc, github: "emqx/gproc", tag: "0.9.0.1"},
-      {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0"},
-      {:telemetry, "1.1.0"}
+      UMP.common_dep(:ecpool),
+      UMP.common_dep(:gproc),
+      UMP.common_dep(:jsx),
+      UMP.common_dep(:telemetry),
     ]
   end
 end

+ 2 - 6
apps/emqx_rule_engine/mix.exs

@@ -29,12 +29,8 @@ defmodule EMQXRuleEngine.MixProject do
       {:emqx_modules, in_umbrella: true},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true},
-      {:emqtt,
-       github: "emqx/emqtt", tag: "1.10.1", override: true, system_env: maybe_no_quic_env()}
+      UMP.common_dep(:rulesql),
+      UMP.common_dep(:emqtt),
     ]
   end
-
-  defp maybe_no_quic_env() do
-    UMP.maybe_no_quic_env()
-  end
 end

+ 2 - 2
apps/emqx_s3/mix.exs

@@ -27,8 +27,8 @@ defmodule EMQXS3.MixProject do
   def deps() do
     [
       {:emqx, in_umbrella: true},
-      {:gproc, github: "emqx/gproc", tag: "0.9.0.1"},
-      {:ehttpc, github: "emqx/ehttpc", tag: "0.4.13"},
+      UMP.common_dep(:gproc),
+      UMP.common_dep(:ehttpc),
       {:erlcloud, github: "emqx/erlcloud", tag: "3.7.0.3"},
       {:emqx_bridge_http, in_umbrella: true, runtime: false}
     ]

+ 1 - 1
apps/emqx_schema_registry/mix.exs

@@ -28,7 +28,7 @@ defmodule EMQXSchemaRegistry.MixProject do
       {:emqx_rule_engine, in_umbrella: true},
       {:erlavro, github: "emqx/erlavro", tag: "2.10.0"},
       {:jesse, github: "emqx/jesse", tag: "1.8.0"},
-      {:gpb, "4.19.9"}
+      UMP.common_dep(:gpb),
     ]
   end
 end

+ 3 - 3
apps/emqx_utils/mix.exs

@@ -26,9 +26,9 @@ defmodule EMQXUtils.MixProject do
 
   def deps() do
     [
-      {:jiffy, github: "emqx/jiffy", tag: "1.0.6"},
-      {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3"},
-      {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.10", override: true},
+      UMP.common_dep(:jiffy),
+      UMP.common_dep(:emqx_http_lib),
+      UMP.common_dep(:snabbkaffe),
     ]
   end
 end

+ 1 - 0
changes/ce/fix-13393.en.md

@@ -0,0 +1 @@
+Fixed an issue where plugin applications were not restarted after a node joins a cluster, leading to an inconsistent state where hooks were not properly installed.

+ 208 - 85
mix.exs

@@ -29,107 +29,230 @@ defmodule EMQXUmbrella.MixProject do
       tarball along with the release.
   """
 
+  # TODO: remove once we switch to the new mix build
+  def new_mix_build?() do
+    System.get_env("NEW_MIX_BUILD") == "1"
+  end
+
   def project() do
     profile_info = check_profile!()
     version = pkg_vsn()
 
-    [
-      # TODO: these lines will be uncommented when we switch to using mix as the manager
-      # for all umbrella apps.
-      # apps_path: "apps",
-      # apps: applications(profile_info.release_type, profile_info.edition_type) |> Keyword.keys(),
-
-      app: :emqx_mix,
-      erlc_options: erlc_options(profile_info, version),
-      version: version,
-      deps: deps(profile_info, version),
-      releases: releases(),
-      aliases: aliases()
-    ]
+    if new_mix_build?() do
+      [
+        # TODO: these lines will be uncommented when we switch to using mix as the manager
+        # for all umbrella apps.
+        apps_path: "apps",
+        apps:
+          applications(profile_info.release_type, profile_info.edition_type) |> Keyword.keys(),
+        erlc_options: erlc_options(profile_info, version),
+        version: version,
+        deps: deps(profile_info, version),
+        releases: releases(),
+        aliases: aliases()
+      ]
+    else
+      # TODO: this check and clause will be removed when we switch to using mix as the
+      # manager for all umbrella apps.
+      [
+        app: :emqx_mix,
+        erlc_options: erlc_options(profile_info, version),
+        version: version,
+        deps: deps(profile_info, version),
+        releases: releases(),
+        aliases: aliases()
+      ]
+    end
   end
 
-  defp deps(profile_info, version) do
+  @doc """
+  Please try to add dependencies that used by a single umbrella application in the
+  application's own `mix.exs` file, if possible.  If it's shared by more than one
+  application, or if the dependency requires an `override: true` option, add a new clause
+  to `common_dep/1` so that we centralize versions in this root `mix.exs` file as much as
+  possible.
+
+  Here, transitive dependencies from our app dependencies should be placed when there's a
+  need to override them.  For example, since `jsone` is a dependency to `rocketmq` and to
+  `erlavro`, which are both dependencies and not umbrella apps, we need to add the
+  override here.  Also, there are cases where adding `override: true` to the umbrella
+  application dependency simply won't satisfy mix.  In such cases, it's fine to add it
+  here.
+  """
+  def deps(profile_info, version) do
     # we need several overrides here because dependencies specify
     # other exact versions, and not ranges.
 
-    ## TODO: this should be removed once we migrate the release build to mix
+    if new_mix_build?() do
+      new_deps()
+    else
+      old_deps(profile_info, version)
+    end
+  end
+
+  def new_deps() do
+    quicer_dep() ++
+      jq_dep() ++
+      extra_release_apps() ++
+      overridden_deps()
+  end
+
+  ## TODO: this should be removed once we migrate the release build to mix
+  defp old_deps(profile_info, version) do
     rebar3_umbrella_apps = emqx_apps(profile_info, version) ++ enterprise_deps(profile_info)
 
     common_deps() ++
-      [
-        {:lc, github: "emqx/lc", tag: "0.3.2", override: true},
-        {:redbug, github: "emqx/redbug", tag: "2.0.10"},
-        {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
-        {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
-        {:ehttpc, github: "emqx/ehttpc", tag: "0.4.14", override: true},
-        {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
-        {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true},
-        {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
-        {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true},
-        {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-6", override: true},
-        {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true},
-        {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
-        {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
-        {:minirest, github: "emqx/minirest", tag: "1.4.3", override: true},
-        {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true},
-        {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true},
-        {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
-        # maybe forbid to fetch quicer
-        {:emqtt,
-         github: "emqx/emqtt", tag: "1.10.1", override: true, system_env: maybe_no_quic_env()},
-        {:rulesql, github: "emqx/rulesql", tag: "0.2.1"},
-        {:observer_cli, "1.7.1"},
-        {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.5"},
-        {:telemetry, "1.1.0", override: true},
-        # in conflict by emqtt and hocon
-        {:getopt, "1.0.2", override: true},
-        {
-          :snabbkaffe,
-          ## without this, snabbkaffe is compiled with `-define(snk_kind, '$kind')`, which
-          ## will basically make events in tests never match any predicates.
-          github: "kafka4beam/snabbkaffe",
-          tag: "1.0.10",
-          override: true,
-          system_env: emqx_app_system_env(profile_info, version)
-        },
-        {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true},
-        {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true},
-        {:esasl, github: "emqx/esasl", tag: "0.2.1"},
-        {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2", override: true},
-        # in conflict by ehttpc and emqtt
-        {:gun, github: "emqx/gun", tag: "1.3.11", override: true},
-        # in conflict by emqx_connector and system_monitor
-        {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true},
-        # in conflict by emqx and observer_cli
-        {:recon, github: "ferd/recon", tag: "2.5.1", override: true},
-        {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true},
-        # in conflict by erlavro and rocketmq
-        {:jsone, github: "emqx/jsone", tag: "1.7.1", override: true},
-        # dependencies of dependencies; we choose specific refs to match
-        # what rebar3 chooses.
-        # in conflict by gun and emqtt
-        {:cowlib,
-         github: "ninenines/cowlib",
-         ref: "c6553f8308a2ca5dcd69d845f0a7d098c40c3363",
-         override: true},
-        # in conflict by cowboy_swagger and cowboy
-        {:ranch, github: "emqx/ranch", tag: "1.8.1-emqx", override: true},
-        # in conflict by grpc and eetcd
-        {:gpb, "4.19.9", override: true, runtime: false},
-        {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true},
-        # set by hackney (dependency)
-        {:ssl_verify_fun, "1.1.7", override: true},
-        {:rfc3339, github: "emqx/rfc3339", tag: "0.2.3", override: true},
-        {:bcrypt, github: "emqx/erlang-bcrypt", tag: "0.6.2", override: true},
-        {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
-        {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
-        {:ra, "2.7.3", override: true},
-        {:mimerl, "1.2.0", override: true}
-      ] ++
+      extra_release_apps() ++
+      overridden_deps() ++
       jq_dep() ++
       quicer_dep() ++ rebar3_umbrella_apps
   end
 
+  def overridden_deps() do
+    [
+      common_dep(:lc),
+      common_dep(:covertool),
+      common_dep(:typerefl),
+      common_dep(:ehttpc),
+      common_dep(:gproc),
+      common_dep(:jiffy),
+      common_dep(:cowboy),
+      common_dep(:esockd),
+      common_dep(:rocksdb),
+      common_dep(:ekka),
+      common_dep(:gen_rpc),
+      common_dep(:grpc),
+      common_dep(:minirest),
+      common_dep(:ecpool),
+      common_dep(:replayq),
+      common_dep(:pbkdf2),
+      # maybe forbid to fetch quicer
+      common_dep(:emqtt),
+      common_dep(:rulesql),
+      common_dep(:telemetry),
+      # in conflict by emqtt and hocon
+      common_dep(:getopt),
+      common_dep(:snabbkaffe),
+      common_dep(:hocon),
+      common_dep(:emqx_http_lib),
+      common_dep(:esasl),
+      common_dep(:jose),
+      # in conflict by ehttpc and emqtt
+      common_dep(:gun),
+      # in conflict by emqx_connector and system_monitor
+      common_dep(:epgsql),
+      # in conflict by emqx and observer_cli
+      {:recon, github: "ferd/recon", tag: "2.5.1", override: true},
+      common_dep(:jsx),
+      # in conflict by erlavro and rocketmq
+      common_dep(:jsone),
+      # dependencies of dependencies; we choose specific refs to match
+      # what rebar3 chooses.
+      # in conflict by gun and emqtt
+      common_dep(:cowlib),
+      # in conflict by cowboy_swagger and cowboy
+      common_dep(:ranch),
+      # in conflict by grpc and eetcd
+      common_dep(:gpb),
+      {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true},
+      # set by hackney (dependency)
+      {:ssl_verify_fun, "1.1.7", override: true},
+      common_dep(:rfc3339),
+      common_dep(:bcrypt),
+      {:uuid, github: "okeuday/uuid", tag: "v2.0.6", override: true},
+      {:quickrand, github: "okeuday/quickrand", tag: "v2.0.6", override: true},
+      {:ra, "2.7.3", override: true},
+      {:mimerl, "1.2.0", override: true}
+    ]
+  end
+
+  def extra_release_apps() do
+    [
+      {:redbug, github: "emqx/redbug", tag: "2.0.10"},
+      {:observer_cli, "1.7.1"},
+      {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.5"}
+    ]
+  end
+
+  def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
+  def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}
+  def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
+  def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true}
+  def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}
+  # in conflict by ehttpc and emqtt
+  def common_dep(:gun), do: {:gun, github: "emqx/gun", tag: "1.3.11", override: true}
+  # in conflict by cowboy_swagger and cowboy
+  def common_dep(:ranch), do: {:ranch, github: "emqx/ranch", tag: "1.8.1-emqx", override: true}
+  def common_dep(:ehttpc), do: {:ehttpc, github: "emqx/ehttpc", tag: "0.4.14", override: true}
+  def common_dep(:jiffy), do: {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}
+  def common_dep(:grpc), do: {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}
+  def common_dep(:cowboy), do: {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}
+  def common_dep(:jsone), do: {:jsone, github: "emqx/jsone", tag: "1.7.1", override: true}
+  def common_dep(:ecpool), do: {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true}
+  def common_dep(:replayq), do: {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true}
+  def common_dep(:jsx), do: {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}
+  # in conflict by emqtt and hocon
+  def common_dep(:getopt), do: {:getopt, "1.0.2", override: true}
+  def common_dep(:telemetry), do: {:telemetry, "1.1.0", override: true}
+  # in conflict by grpc and eetcd
+  def common_dep(:gpb), do: {:gpb, "4.19.9", override: true, runtime: false}
+
+  def common_dep(:covertool),
+    do: {:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true}
+
+  # in conflict by emqx_connector and system_monitor
+  def common_dep(:epgsql), do: {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true}
+  def common_dep(:esasl), do: {:esasl, github: "emqx/esasl", tag: "0.2.1"}
+  def common_dep(:gen_rpc), do: {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}
+
+  def common_dep(:jose),
+    do: {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2", override: true}
+
+  def common_dep(:rulesql), do: {:rulesql, github: "emqx/rulesql", tag: "0.2.1"}
+
+  def common_dep(:pbkdf2),
+    do: {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}
+
+  def common_dep(:bcrypt),
+    do: {:bcrypt, github: "emqx/erlang-bcrypt", tag: "0.6.2", override: true}
+
+  # hex version 0.2.2 used by `jesse` has buggy mix.exs
+  def common_dep(:rfc3339), do: {:rfc3339, github: "emqx/rfc3339", tag: "0.2.3", override: true}
+
+  def common_dep(:minirest),
+    do: {:minirest, github: "emqx/minirest", tag: "1.4.3", override: true}
+
+  # maybe forbid to fetch quicer
+  def common_dep(:emqtt),
+    do:
+      {:emqtt,
+       github: "emqx/emqtt", tag: "1.10.1", override: true, system_env: maybe_no_quic_env()}
+
+  def common_dep(:typerefl),
+    do: {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}
+
+  def common_dep(:rocksdb),
+    do: {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-6", override: true}
+
+  def common_dep(:emqx_http_lib),
+    do: {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true}
+
+  def common_dep(:cowlib),
+    do:
+      {:cowlib,
+       github: "ninenines/cowlib", ref: "c6553f8308a2ca5dcd69d845f0a7d098c40c3363", override: true}
+
+  def common_dep(:snabbkaffe),
+    do: {
+      :snabbkaffe,
+      ## without this, snabbkaffe is compiled with `-define(snk_kind, '$kind')`, which
+      ## will basically make events in tests never match any predicates.
+      github: "kafka4beam/snabbkaffe",
+      tag: "1.0.10",
+      override: true,
+      system_env: emqx_app_system_env(profile_info(), pkg_vsn())
+    }
+
   ###############################################################################################
   # BEGIN DEPRECATED FOR MIX BLOCK
   # These should be removed once we fully migrate to mix