Преглед на файлове

Merge pull request #11652 from id/0921-e5.3.0-code-freeze

e5.3.0 code freeze
Ivan Dyachkov преди 2 години
родител
ревизия
2e9f451df3

+ 1 - 0
.github/workflows/_push-entrypoint.yaml

@@ -13,6 +13,7 @@ on:
       - 'master'
       - 'release-51'
       - 'release-52'
+      - 'release-53'
       - 'ci/**'
 
 env:

+ 1 - 1
.github/workflows/build_packages_cron.yaml

@@ -21,8 +21,8 @@ jobs:
       matrix:
         profile:
           - ['emqx', 'master']
-          - ['emqx-enterprise', 'release-51']
           - ['emqx-enterprise', 'release-52']
+          - ['emqx-enterprise', 'release-53']
         otp:
           - 25.3.2-2
         arch:

+ 3 - 2
apps/emqx/include/asserts.hrl

@@ -30,11 +30,12 @@
     )
 ).
 
--define(drainMailbox(),
+-define(drainMailbox(), ?drainMailbox(0)).
+-define(drainMailbox(TIMEOUT),
     (fun F__Flush_() ->
         receive
             X__Msg_ -> [X__Msg_ | F__Flush_()]
-        after 0 -> []
+        after TIMEOUT -> []
         end
     end)()
 ).

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -35,7 +35,7 @@
 -define(EMQX_RELEASE_CE, "5.2.1").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.2.1").
+-define(EMQX_RELEASE_EE, "5.3.0-alpha.1").
 
 %% The HTTP API version
 -define(EMQX_API_VERSION, "5.0").

+ 6 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -102,13 +102,18 @@ init_per_group(Group, Config) when Group == quic ->
         [
             {emqx,
                 ?config(emqx_config, Config) ++
-                    "\n listeners.quic.test { enable = true }"}
+                    "\n listeners.quic.test {"
+                    "\n   enable = true"
+                    "\n   ssl_options.verify = verify_peer"
+                    "\n }"}
         ],
         #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     [
         {port, get_listener_port(quic, test)},
         {conn_fun, quic_connect},
+        {ssl_opts, emqx_common_test_helpers:client_ssl_twoway()},
+        {ssl, true},
         {group_apps, Apps}
         | Config
     ];

+ 68 - 71
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -19,14 +19,14 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_cm.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(TOPIC, <<"t">>).
 -define(CNT, 100).
+-define(SLEEP, 10).
 
 %%--------------------------------------------------------------------
 %% Initial funcs
@@ -49,89 +49,86 @@ end_per_suite(Config) ->
 
 t_takeover(_) ->
     process_flag(trap_exit, true),
-    AllMsgs = messages(?CNT),
-    Pos = rand:uniform(?CNT),
-
     ClientId = <<"clientid">>,
-    {ok, C1} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
-    {ok, _} = emqtt:connect(C1),
-    emqtt:subscribe(C1, <<"t">>, 1),
-
-    spawn(fun() ->
-        [
-            begin
-                emqx:publish(lists:nth(I, AllMsgs)),
-                timer:sleep(rand:uniform(10))
-            end
-         || I <- lists:seq(1, Pos)
-        ]
-    end),
-    emqtt:pause(C1),
-    timer:sleep(?CNT * 10),
-
-    load_meck(ClientId),
-    spawn(fun() ->
-        [
-            begin
-                emqx:publish(lists:nth(I, AllMsgs)),
-                timer:sleep(rand:uniform(10))
-            end
-         || I <- lists:seq(Pos + 1, ?CNT)
-        ]
+    Middle = ?CNT div 2,
+    Client1Msgs = messages(0, Middle),
+    Client2Msgs = messages(Middle, ?CNT div 2),
+    AllMsgs = Client1Msgs ++ Client2Msgs,
+
+    meck:new(emqx_cm, [non_strict, passthrough]),
+    meck:expect(emqx_cm, takeover_session_end, fun(Arg) ->
+        ok = timer:sleep(?SLEEP * 2),
+        meck:passthrough([Arg])
     end),
-    {ok, C2} = emqtt:start_link([{clientid, ClientId}, {clean_start, false}]),
-    {ok, _} = emqtt:connect(C2),
 
-    Received = all_received_publishs(),
-    ct:pal("middle: ~p, received: ~p", [Pos, [P || {publish, #{payload := P}} <- Received]]),
+    Commands =
+        [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
+            [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
+            [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++
+            [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++
+            [{fun stop_client/1, []}],
+
+    FCtx = lists:foldl(
+        fun({Fun, Args}, Ctx) ->
+            ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
+            apply(Fun, [Ctx | Args])
+        end,
+        #{},
+        Commands
+    ),
+
+    #{client := [CPid2, CPid1]} = FCtx,
+    ?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}),
+    ?assertReceive({'EXIT', CPid2, normal}),
+
+    Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)],
+    ct:pal("middle: ~p", [Middle]),
+    ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
     assert_messages_missed(AllMsgs, Received),
     assert_messages_order(AllMsgs, Received),
 
-    emqtt:disconnect(C2),
-    unload_meck(ClientId).
+    meck:unload(emqx_cm),
+    ok.
 
 t_takover_in_cluster(_) ->
     todo.
 
 %%--------------------------------------------------------------------
-%% Helpers
+%% Commands
+
+start_client(Ctx, ClientId, Topic, Qos) ->
+    {ok, CPid} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {clean_start, false}
+    ]),
+    _ = erlang:spawn_link(fun() ->
+        {ok, _} = emqtt:connect(CPid),
+        ct:pal("CLIENT: connected ~p", [CPid]),
+        {ok, _, [Qos]} = emqtt:subscribe(CPid, Topic, Qos)
+    end),
+    Ctx#{client => [CPid | maps:get(client, Ctx, [])]}.
 
-load_meck(ClientId) ->
-    meck:new(fake_conn_mod, [non_strict]),
-    HookTakeover = fun
-        (Pid, Msg = {takeover, 'begin'}) ->
-            emqx_connection:call(Pid, Msg);
-        (Pid, Msg = {takeover, 'end'}) ->
-            timer:sleep(?CNT * 10),
-            emqx_connection:call(Pid, Msg);
-        (Pid, Msg) ->
-            emqx_connection:call(Pid, Msg)
-    end,
-    meck:expect(fake_conn_mod, call, HookTakeover),
-    [ChanPid] = emqx_cm:lookup_channels(ClientId),
-    ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId),
-    NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}},
-    true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}).
-
-unload_meck(_ClientId) ->
-    meck:unload(fake_conn_mod).
-
-all_received_publishs() ->
-    all_received_publishs([]).
-
-all_received_publishs(Ls) ->
-    receive
-        M = {publish, _Pub} -> all_received_publishs([M | Ls]);
-        _ -> all_received_publishs(Ls)
-    after 100 ->
-        lists:reverse(Ls)
+publish_msg(Ctx, Msg) ->
+    ok = timer:sleep(rand:uniform(?SLEEP)),
+    case emqx:publish(Msg) of
+        [] -> publish_msg(Ctx, Msg);
+        [_ | _] -> Ctx
     end.
 
+stop_client(Ctx = #{client := [CPid | _]}) ->
+    ok = timer:sleep(?SLEEP),
+    ok = emqtt:stop(CPid),
+    Ctx.
+
+%%--------------------------------------------------------------------
+%% Helpers
+
 assert_messages_missed(Ls1, Ls2) ->
     Missed = lists:filtermap(
         fun(Msg) ->
             No = emqx_message:payload(Msg),
-            case lists:any(fun({publish, #{payload := No1}}) -> No1 == No end, Ls2) of
+            case lists:any(fun(#{payload := No1}) -> No1 == No end, Ls2) of
                 true -> false;
                 false -> {true, No}
             end
@@ -148,7 +145,7 @@ assert_messages_missed(Ls1, Ls2) ->
 
 assert_messages_order([], []) ->
     ok;
-assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) ->
+assert_messages_order([Msg | Ls1], [#{payload := No} | Ls2]) ->
     case emqx_message:payload(Msg) == No of
         false ->
             ct:fail("Message order is not correct, expected: ~p, received: ~p", [
@@ -159,8 +156,8 @@ assert_messages_order([Msg | Ls1], [{publish, #{payload := No}} | Ls2]) ->
             assert_messages_order(Ls1, Ls2)
     end.
 
-messages(Cnt) ->
-    [emqx_message:make(ct, 1, ?TOPIC, payload(I)) || I <- lists:seq(1, Cnt)].
+messages(Offset, Cnt) ->
+    [emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)].
 
 payload(I) ->
     % NOTE

+ 5 - 0
apps/emqx_dashboard/src/emqx_dashboard.erl

@@ -210,6 +210,11 @@ filter_false(K, V, S) -> [{K, V} | S].
 listener_name(Protocol) ->
     list_to_atom(atom_to_list(Protocol) ++ ":dashboard").
 
+-if(?EMQX_RELEASE_EDITION =/= ee).
+%% dialyzer complains about the `unauthorized_role' clause...
+-dialyzer({no_match, [authorize/1]}).
+-endif.
+
 authorize(Req) ->
     case cowboy_req:parse_header(<<"authorization">>, Req) of
         {basic, Username, Password} ->

+ 4 - 0
apps/emqx_dashboard/src/emqx_dashboard_admin.erl

@@ -374,6 +374,10 @@ sign_token(Username, Password) ->
             Error
     end.
 
+-spec verify_token(_, Token :: binary()) ->
+    Result ::
+        {ok, binary()}
+        | {error, token_timeout | not_found | unauthorized_role}.
 verify_token(Req, Token) ->
     emqx_dashboard_token:verify(Req, Token).
 

+ 26 - 20
apps/emqx_dashboard/src/emqx_dashboard_token.erl

@@ -122,23 +122,16 @@ do_sign(#?ADMIN{username = Username} = User, Password) ->
     _ = mria:transaction(?DASHBOARD_SHARD, fun mnesia:write/1, [JWTRec]),
     {ok, Token}.
 
+-spec do_verify(_, Token :: binary()) ->
+    Result ::
+        {ok, binary()}
+        | {error, token_timeout | not_found | unauthorized_role}.
 do_verify(Req, Token) ->
     case lookup(Token) of
-        {ok, JWT = #?ADMIN_JWT{exptime = ExpTime, extra = Extra, username = Username}} ->
+        {ok, JWT = #?ADMIN_JWT{exptime = ExpTime, extra = _Extra, username = _Username}} ->
             case ExpTime > erlang:system_time(millisecond) of
                 true ->
-                    case check_rbac(Req, Extra) of
-                        true ->
-                            NewJWT = JWT#?ADMIN_JWT{exptime = jwt_expiration_time()},
-                            {atomic, Res} = mria:transaction(
-                                ?DASHBOARD_SHARD,
-                                fun mnesia:write/1,
-                                [NewJWT]
-                            ),
-                            {Res, Username};
-                        _ ->
-                            {error, unauthorized_role}
-                    end;
+                    check_rbac(Req, JWT);
                 _ ->
                     {error, token_timeout}
             end;
@@ -254,15 +247,28 @@ clean_expired_jwt(Now) ->
     ok = destroy(JWTList).
 
 -if(?EMQX_RELEASE_EDITION == ee).
-check_rbac(Req, Extra) ->
-    emqx_dashboard_rbac:check_rbac(Req, Extra).
+check_rbac(Req, JWT) ->
+    #?ADMIN_JWT{exptime = _ExpTime, extra = Extra, username = _Username} = JWT,
+    case emqx_dashboard_rbac:check_rbac(Req, Extra) of
+        true ->
+            save_new_jwt(JWT);
+        _ ->
+            {error, unauthorized_role}
+    end.
 
 -else.
 
--dialyzer({nowarn_function, [check_rbac/2]}).
--dialyzer({no_match, [do_verify/2]}).
-
-check_rbac(_Req, _Extra) ->
-    true.
+check_rbac(_Req, JWT) ->
+    save_new_jwt(JWT).
 
 -endif.
+
+save_new_jwt(OldJWT) ->
+    #?ADMIN_JWT{exptime = _ExpTime, extra = _Extra, username = Username} = OldJWT,
+    NewJWT = OldJWT#?ADMIN_JWT{exptime = jwt_expiration_time()},
+    {atomic, Res} = mria:transaction(
+        ?DASHBOARD_SHARD,
+        fun mnesia:write/1,
+        [NewJWT]
+    ),
+    {Res, Username}.

+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.2.1
+version: 5.3.0-alpha.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.2.1
+appVersion: 5.3.0-alpha.1

+ 9 - 7
scripts/rel/cut.sh

@@ -22,6 +22,7 @@ options:
   -b|--base:         Specify the current release base branch, can be one of
                      release-51
                      release-52
+                     release-53
                      NOTE: this option should be used when --dryrun.
 
   --dryrun:          Do not actually create the git tag.
@@ -33,15 +34,10 @@ options:
                      If this option is absent, the tag found by git describe will be used
 
 
-For 5.1 series the current working branch must be 'release-51'
+For 5.X series the current working branch must be 'release-5X'
       --.--[  master  ]---------------------------.-----------.---
          \\                                      /
-          \`---[release-51]----(v5.1.1 | e5.1.1)
-
-For 5.2 series the current working branch must be 'release-52'
-      --.--[  master  ]---------------------------.-----------.---
-         \\                                      /
-          \`---[release-52]----(v5.2.1 | e5.2.1)
+          \`---[release-53]----(v5.3.1 | e5.3.1)
 EOF
 }
 
@@ -134,6 +130,12 @@ rel_branch() {
         e5.2.*)
             echo 'release-52'
             ;;
+        v5.3.*)
+            echo 'release-53'
+            ;;
+        e5.3.*)
+            echo 'release-53'
+            ;;
         *)
             logerr "Unsupported version tag $TAG"
             exit 1

+ 7 - 3
scripts/rel/sync-remotes.sh

@@ -5,7 +5,7 @@ set -euo pipefail
 # ensure dir
 cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
 
-BASE_BRANCHES=( 'release-52' 'release-51' 'master' )
+BASE_BRANCHES=( 'release-53' 'release-52' 'release-51' 'master' )
 
 usage() {
     cat <<EOF
@@ -18,9 +18,10 @@ options:
     It tries to merge (by default with --ff-only option)
     upstreams branches for the current working branch.
     The uppstream branch of the current branch are as below:
+    * release-53: []        # no upstream for 5.3 opensource edition
     * release-52: []        # no upstream for 5.2 opensource edition
     * release-51: []        # no upstream for 5.1 opensource edition
-    * master: [release-52]  # sync release-52 to master
+    * master: [release-53]  # sync release-53 to master
 
   -b|--base:
     The base branch of current working branch if currently is not
@@ -152,6 +153,9 @@ remote_refs() {
 upstream_branches() {
     local base="$1"
     case "$base" in
+        release-53)
+            remote_ref "$base"
+            ;;
         release-52)
             remote_ref "$base"
             ;;
@@ -159,7 +163,7 @@ upstream_branches() {
             remote_ref "$base"
             ;;
         master)
-            remote_refs "$base" 'release-52'
+            remote_refs "$base" 'release-53'
             ;;
     esac
 }

+ 50 - 0
scripts/shelltest/parse-git-ref.test

@@ -3,6 +3,11 @@
 Unrecognized tag: refs/tags/v5.2.0-foobar.1
 >>>= 1
 
+./parse-git-ref.sh refs/tags/v5.3.0-foobar.1
+>>>2
+Unrecognized tag: refs/tags/v5.3.0-foobar.1
+>>>= 1
+
 ./parse-git-ref.sh v5.2.0
 >>>2
 Unrecognized git ref: v5.2.0
@@ -18,6 +23,21 @@ Unrecognized git ref: v5.2.0-1
 Unrecognized git ref: e5.2.0-1
 >>>= 1
 
+./parse-git-ref.sh v5.3.0
+>>>2
+Unrecognized git ref: v5.3.0
+>>>= 1
+
+./parse-git-ref.sh v5.3.0-1
+>>>2
+Unrecognized git ref: v5.3.0-1
+>>>= 1
+
+./parse-git-ref.sh e5.3.0-1
+>>>2
+Unrecognized git ref: e5.3.0-1
+>>>= 1
+
 ./parse-git-ref.sh refs/tags/v5.1.0
 >>>
 {"profile": "emqx", "release": true, "latest": false}
@@ -33,6 +53,11 @@ Unrecognized git ref: e5.2.0-1
 {"profile": "emqx", "release": true, "latest": false}
 >>>= 0
 
+./parse-git-ref.sh refs/tags/v5.3.0-alpha.1
+>>>
+{"profile": "emqx", "release": true, "latest": false}
+>>>= 0
+
 ./parse-git-ref.sh refs/tags/v5.2.0-alpha-1
 >>>2
 Unrecognized tag: refs/tags/v5.2.0-alpha-1
@@ -43,6 +68,11 @@ Unrecognized tag: refs/tags/v5.2.0-alpha-1
 {"profile": "emqx", "release": true, "latest": false}
 >>>= 0
 
+./parse-git-ref.sh refs/tags/v5.3.0-beta.1
+>>>
+{"profile": "emqx", "release": true, "latest": false}
+>>>= 0
+
 ./parse-git-ref.sh refs/tags/v5.2.0-rc.1
 >>>
 {"profile": "emqx", "release": true, "latest": false}
@@ -63,16 +93,31 @@ Unrecognized tag: refs/tags/v5.2.0-alpha-1
 {"profile": "emqx-enterprise", "release": true, "latest": false}
 >>>= 0
 
+./parse-git-ref.sh refs/tags/e5.3.0-alpha.1
+>>>
+{"profile": "emqx-enterprise", "release": true, "latest": false}
+>>>= 0
+
 ./parse-git-ref.sh refs/tags/e5.2.0-beta.1
 >>>
 {"profile": "emqx-enterprise", "release": true, "latest": false}
 >>>= 0
 
+./parse-git-ref.sh refs/tags/e5.3.0-beta.1
+>>>
+{"profile": "emqx-enterprise", "release": true, "latest": false}
+>>>= 0
+
 ./parse-git-ref.sh refs/tags/e5.2.0-rc.1
 >>>
 {"profile": "emqx-enterprise", "release": true, "latest": false}
 >>>= 0
 
+./parse-git-ref.sh refs/tags/e5.3.0-rc.1
+>>>
+{"profile": "emqx-enterprise", "release": true, "latest": false}
+>>>= 0
+
 ./parse-git-ref.sh refs/tags/e5.1.99
 >>>
 {"profile": "emqx-enterprise", "release": true, "latest": true}
@@ -98,6 +143,11 @@ Unrecognized tag: refs/tags/v5.2.0-alpha-1
 {"profile": "emqx-enterprise", "release": false, "latest": false}
 >>>= 0
 
+./parse-git-ref.sh refs/heads/release-53
+>>>
+{"profile": "emqx-enterprise", "release": false, "latest": false}
+>>>= 0
+
 ./parse-git-ref.sh refs/heads/ci/foobar
 >>>
 {"profile": "emqx", "release": false, "latest": false}