Browse Source

Merge remote-tracking branch 'origin/release-50' into 1209-chore-merge-dev/ee5.0-to-release-50

Zaiming (Stone) Shi 3 years ago
parent
commit
6a4fb1241b

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -27,6 +27,7 @@
 {emqx_prometheus,1}.
 {emqx_resource,1}.
 {emqx_retainer,1}.
+{emqx_retainer,2}.
 {emqx_rule_engine,1}.
 {emqx_shared_sub,1}.
 {emqx_slow_subs,1}.

+ 1 - 1
apps/emqx/rebar.config

@@ -27,7 +27,7 @@
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}},
-    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}},
+    {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.7"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},

+ 27 - 3
apps/emqx/src/emqx_access_control.erl

@@ -46,16 +46,32 @@ authenticate(Credential) ->
     NotSuperUser = #{is_superuser => false},
     case emqx_authentication:pre_hook_authenticate(Credential) of
         ok ->
+            inc_authn_metrics(anonymous),
             {ok, NotSuperUser};
         continue ->
-            case run_hooks('client.authenticate', [Credential], {ok, #{is_superuser => false}}) of
+            case run_hooks('client.authenticate', [Credential], ignore) of
+                ignore ->
+                    inc_authn_metrics(anonymous),
+                    {ok, NotSuperUser};
                 ok ->
+                    inc_authn_metrics(ok),
                     {ok, NotSuperUser};
+                {ok, _AuthResult} = OkResult ->
+                    inc_authn_metrics(ok),
+                    OkResult;
+                {ok, _AuthResult, _AuthData} = OkResult ->
+                    inc_authn_metrics(ok),
+                    OkResult;
+                {error, _Reason} = Error ->
+                    inc_authn_metrics(error),
+                    Error;
+                %% {continue, AuthCache} | {continue, AuthData, AuthCache}
                 Other ->
                     Other
             end;
-        Other ->
-            Other
+        {error, _Reason} = Error ->
+            inc_authn_metrics(error),
+            Error
     end.
 
 %% @doc Check Authorization
@@ -134,3 +150,11 @@ inc_authz_metrics(deny) ->
     emqx_metrics:inc('authorization.deny');
 inc_authz_metrics(cache_hit) ->
     emqx_metrics:inc('authorization.cache_hit').
+
+inc_authn_metrics(error) ->
+    emqx_metrics:inc('authentication.failure');
+inc_authn_metrics(ok) ->
+    emqx_metrics:inc('authentication.success');
+inc_authn_metrics(anonymous) ->
+    emqx_metrics:inc('authentication.success.anonymous'),
+    emqx_metrics:inc('authentication.success').

+ 5 - 23
apps/emqx/src/emqx_authentication.erl

@@ -228,7 +228,6 @@ when
 -spec pre_hook_authenticate(emqx_types:clientinfo()) ->
     ok | continue | {error, not_authorized}.
 pre_hook_authenticate(#{enable_authn := false}) ->
-    inc_authenticate_metric('authentication.success.anonymous'),
     ?TRACE_RESULT("authentication_result", ok, enable_authn_false);
 pre_hook_authenticate(#{enable_authn := quick_deny_anonymous} = Credential) ->
     case maps:get(username, Credential, undefined) of
@@ -242,29 +241,18 @@ pre_hook_authenticate(#{enable_authn := quick_deny_anonymous} = Credential) ->
 pre_hook_authenticate(_) ->
     continue.
 
-authenticate(#{listener := Listener, protocol := Protocol} = Credential, _AuthResult) ->
+authenticate(#{listener := Listener, protocol := Protocol} = Credential, AuthResult) ->
     case get_authenticators(Listener, global_chain(Protocol)) of
         {ok, ChainName, Authenticators} ->
             case get_enabled(Authenticators) of
                 [] ->
-                    inc_authenticate_metric('authentication.success.anonymous'),
-                    ?TRACE_RESULT("authentication_result", ignore, empty_chain);
+                    ?TRACE_RESULT("authentication_result", AuthResult, empty_chain);
                 NAuthenticators ->
                     Result = do_authenticate(ChainName, NAuthenticators, Credential),
-
-                    case Result of
-                        {stop, {ok, _}} ->
-                            inc_authenticate_metric('authentication.success');
-                        {stop, {error, _}} ->
-                            inc_authenticate_metric('authentication.failure');
-                        _ ->
-                            ok
-                    end,
                     ?TRACE_RESULT("authentication_result", Result, chain_result)
             end;
         none ->
-            inc_authenticate_metric('authentication.success.anonymous'),
-            ?TRACE_RESULT("authentication_result", ignore, no_chain)
+            ?TRACE_RESULT("authentication_result", AuthResult, no_chain)
     end.
 
 get_authenticators(Listener, Global) ->
@@ -649,7 +637,7 @@ handle_create_authenticator(Chain, Config, Providers) ->
     end.
 
 do_authenticate(_ChainName, [], _) ->
-    {stop, {error, not_authorized}};
+    {ok, {error, not_authorized}};
 do_authenticate(
     ChainName, [#authenticator{id = ID} = Authenticator | More], Credential
 ) ->
@@ -673,7 +661,7 @@ do_authenticate(
                 _ ->
                     ok
             end,
-            {stop, Result}
+            {ok, Result}
     catch
         Class:Reason:Stacktrace ->
             ?TRACE_AUTHN(warning, "authenticator_error", #{
@@ -947,9 +935,3 @@ to_list(M) when is_map(M) -> [M];
 to_list(L) when is_list(L) -> L.
 
 call(Call) -> gen_server:call(?MODULE, Call, infinity).
-
-inc_authenticate_metric('authentication.success.anonymous' = Metric) ->
-    emqx_metrics:inc(Metric),
-    emqx_metrics:inc('authentication.success');
-inc_authenticate_metric(Metric) ->
-    emqx_metrics:inc(Metric).

+ 5 - 0
apps/emqx/src/emqx_listeners.erl

@@ -199,6 +199,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
             Reason =:= listener_disabled;
             Reason =:= quic_app_missing
         ->
+            ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
             console_print(
                 "Listener ~ts is NOT started due to: ~p.~n",
                 [listener_id(Type, ListenerName), Reason]
@@ -212,8 +213,12 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
             ),
             ok;
         {error, {already_started, Pid}} ->
+            ?tp(listener_not_started, #{
+                type => Type, bind => Bind, status => {already_started, Pid}
+            }),
             {error, {already_started, Pid}};
         {error, Reason} ->
+            ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}),
             ListenerId = listener_id(Type, ListenerName),
             BindStr = format_bind(Bind),
             ?ELOG(

+ 119 - 0
apps/emqx/test/emqx_authentication_SUITE.erl

@@ -22,6 +22,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx/include/emqx_hooks.hrl").
+
 -include_lib("common_test/include/ct.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("typerefl/include/types.hrl").
@@ -35,6 +37,20 @@
     end)()
 ).
 -define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
+-define(NOT_SUPERUSER, #{is_superuser => false}).
+
+-define(assertAuthSuccessForUser(User),
+    ?assertMatch(
+        {ok, _},
+        emqx_access_control:authenticate(ClientInfo#{username => atom_to_binary(User)})
+    )
+).
+-define(assertAuthFailureForUser(User),
+    ?assertMatch(
+        {error, _},
+        emqx_access_control:authenticate(ClientInfo#{username => atom_to_binary(User)})
+    )
+).
 
 %%------------------------------------------------------------------------------
 %% Hocon Schema
@@ -88,9 +104,22 @@ update(_Config, _State) ->
 
 authenticate(#{username := <<"good">>}, _State) ->
     {ok, #{is_superuser => true}};
+authenticate(#{username := <<"ignore">>}, _State) ->
+    ignore;
 authenticate(#{username := _}, _State) ->
     {error, bad_username_or_password}.
 
+hook_authenticate(#{username := <<"hook_user_good">>}, _AuthResult) ->
+    {ok, {ok, ?NOT_SUPERUSER}};
+hook_authenticate(#{username := <<"hook_user_bad">>}, _AuthResult) ->
+    {ok, {error, invalid_username}};
+hook_authenticate(#{username := <<"hook_user_finally_good">>}, _AuthResult) ->
+    {stop, {ok, ?NOT_SUPERUSER}};
+hook_authenticate(#{username := <<"hook_user_finally_bad">>}, _AuthResult) ->
+    {stop, {error, invalid_username}};
+hook_authenticate(_ClientId, AuthResult) ->
+    {ok, AuthResult}.
+
 destroy(_State) ->
     ok.
 
@@ -113,6 +142,10 @@ end_per_testcase(Case, Config) ->
     _ = ?MODULE:Case({'end', Config}),
     ok.
 
+%%=================================================================================
+%% Testcases
+%%=================================================================================
+
 t_chain({'init', Config}) ->
     Config;
 t_chain(Config) when is_list(Config) ->
@@ -500,6 +533,92 @@ t_convert_certs(Config) when is_list(Config) ->
     clear_certs(CertsDir, #{<<"ssl">> => NCerts3}),
     ?assertEqual(false, filelib:is_regular(maps:get(<<"keyfile">>, NCerts3))).
 
+t_combine_authn_and_callback({init, Config}) ->
+    [
+        {listener_id, 'tcp:default'},
+        {authn_type, {password_based, built_in_database}}
+        | Config
+    ];
+t_combine_authn_and_callback(Config) when is_list(Config) ->
+    ListenerID = ?config(listener_id),
+    ClientInfo = #{
+        zone => default,
+        listener => ListenerID,
+        protocol => mqtt,
+        password => <<"any">>
+    },
+
+    %% no emqx_authentication authenticators, anonymous is allowed
+    ?assertAuthSuccessForUser(bad),
+
+    AuthNType = ?config(authn_type),
+    register_provider(AuthNType, ?MODULE),
+
+    AuthenticatorConfig = #{
+        mechanism => password_based,
+        backend => built_in_database,
+        enable => true
+    },
+    {ok, _} = ?AUTHN:create_authenticator(ListenerID, AuthenticatorConfig),
+
+    %% emqx_authentication alone
+    ?assertAuthSuccessForUser(good),
+    ?assertAuthFailureForUser(ignore),
+    ?assertAuthFailureForUser(bad),
+
+    %% add hook with higher priority
+    ok = hook(?HP_AUTHN + 1),
+
+    %% for hook unrelataed users everything is the same
+    ?assertAuthSuccessForUser(good),
+    ?assertAuthFailureForUser(ignore),
+    ?assertAuthFailureForUser(bad),
+
+    %% higher-priority hook can permit access with {ok,...},
+    %% then emqx_authentication overrides the result
+    ?assertAuthFailureForUser(hook_user_good),
+    ?assertAuthFailureForUser(hook_user_bad),
+
+    %% higher-priority hook can permit and return {stop,...},
+    %% then emqx_authentication cannot override the result
+    ?assertAuthSuccessForUser(hook_user_finally_good),
+    ?assertAuthFailureForUser(hook_user_finally_bad),
+
+    ok = unhook(),
+
+    %% add hook with lower priority
+    ok = hook(?HP_AUTHN - 1),
+
+    %% for hook unrelataed users
+    ?assertAuthSuccessForUser(good),
+    ?assertAuthFailureForUser(bad),
+    ?assertAuthFailureForUser(ignore),
+
+    %% lower-priority hook can overrride auth result,
+    %% because emqx_authentication permits/denies with {ok, ...}
+    ?assertAuthSuccessForUser(hook_user_good),
+    ?assertAuthFailureForUser(hook_user_bad),
+    ?assertAuthSuccessForUser(hook_user_finally_good),
+    ?assertAuthFailureForUser(hook_user_finally_bad),
+
+    ok = unhook();
+t_combine_authn_and_callback({'end', Config}) ->
+    ?AUTHN:delete_chain(?config(listener_id)),
+    ?AUTHN:deregister_provider(?config(authn_type)),
+    ok.
+
+%%=================================================================================
+%% Helpers fns
+%%=================================================================================
+
+hook(Priority) ->
+    ok = emqx_hooks:put(
+        'client.authenticate', {?MODULE, hook_authenticate, []}, Priority
+    ).
+
+unhook() ->
+    ok = emqx_hooks:del('client.authenticate', {?MODULE, hook_authenticate}).
+
 update_config(Path, ConfigRequest) ->
     emqx:update_config(Path, ConfigRequest, #{rawconf_with_defaults => true}).
 

+ 3 - 0
apps/emqx/test/emqx_trace_SUITE.erl

@@ -43,6 +43,9 @@ init_per_suite(Config) ->
             timer:seconds(100)
         ),
         fun(Trace) ->
+            ct:pal("listener start statuses: ~p", [
+                ?of_kind([listener_started, listener_not_started], Trace)
+            ]),
             %% more than one listener
             ?assertMatch([_ | _], ?of_kind(listener_started, Trace))
         end

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.7"},
+    {vsn, "5.0.8"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx]},

+ 117 - 92
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -38,11 +38,9 @@
 
 %% Internal exports (RPC)
 -export([
-    do_store_retained/1,
-    do_clear_expired/0,
-    do_delete_message/1,
     do_populate_index_meta/1,
-    do_reindex_batch/2
+    do_reindex_batch/2,
+    active_indices/0
 ]).
 
 %% Management API:
@@ -66,6 +64,8 @@
 -define(CLEAR_BATCH_SIZE, 1000).
 -define(REINDEX_BATCH_SIZE, 1000).
 -define(REINDEX_DISPATCH_WAIT, 30000).
+-define(REINDEX_RPC_RETRY_INTERVAL, 1000).
+-define(REINDEX_INDEX_UPDATE_WAIT, 30000).
 
 %%--------------------------------------------------------------------
 %% Management API
@@ -136,64 +136,41 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
     end.
 
 store_retained(_, Msg = #message{topic = Topic}) ->
-    case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of
-        {atomic, ok} ->
-            ?tp(debug, message_retained, #{topic => Topic}),
-            ok;
-        {aborted, Reason} ->
+    ExpiryTime = emqx_retainer:get_expiry_time(Msg),
+    Tokens = topic_to_tokens(Topic),
+    case is_table_full() andalso is_new_topic(Tokens) of
+        true ->
             ?SLOG(error, #{
                 msg => "failed_to_retain_message",
                 topic => Topic,
-                reason => Reason
-            })
-    end.
-
-do_store_retained(#message{topic = Topic} = Msg) ->
-    ExpiryTime = emqx_retainer:get_expiry_time(Msg),
-    Tokens = topic_to_tokens(Topic),
-    case is_table_full() of
+                reason => table_is_full
+            });
         false ->
-            store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
-        _ ->
-            case mnesia:read(?TAB_MESSAGE, Tokens, write) of
-                [_] ->
-                    store_retained(db_indices(write), Msg, Tokens, ExpiryTime);
-                [] ->
-                    mnesia:abort(table_is_full)
-            end
+            do_store_retained(Msg, Tokens, ExpiryTime)
     end.
 
 clear_expired(_) ->
-    {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0),
-    ok.
-
-do_clear_expired() ->
     NowMs = erlang:system_time(millisecond),
     QH = qlc:q([
-        TopicTokens
+        RetainedMsg
      || #retained_message{
-            topic = TopicTokens,
             expiry_time = ExpiryTime
-        } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]),
+        } = RetainedMsg <- ets:table(?TAB_MESSAGE),
         (ExpiryTime =/= 0) and (ExpiryTime < NowMs)
     ]),
     QC = qlc:cursor(QH),
-    clear_batch(db_indices(write), QC).
+    clear_batch(dirty_indices(write), QC).
 
 delete_message(_, Topic) ->
-    {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]),
-    ok.
-
-do_delete_message(Topic) ->
     Tokens = topic_to_tokens(Topic),
     case emqx_topic:wildcard(Topic) of
         false ->
-            ok = delete_message_by_topic(Tokens, db_indices(write));
+            ok = delete_message_by_topic(Tokens, dirty_indices(write));
         true ->
-            QH = topic_search_table(Tokens),
+            QH = search_table(Tokens, 0),
             qlc:fold(
-                fun(TopicTokens, _) ->
-                    ok = delete_message_by_topic(TopicTokens, db_indices(write))
+                fun(RetainedMsg, _) ->
+                    ok = delete_message_with_indices(RetainedMsg, dirty_indices(write))
                 end,
                 undefined,
                 QH
@@ -206,7 +183,7 @@ read_message(_, Topic) ->
 match_messages(_, Topic, undefined) ->
     Tokens = topic_to_tokens(Topic),
     Now = erlang:system_time(millisecond),
-    QH = search_table(Tokens, Now),
+    QH = msg_table(search_table(Tokens, Now)),
     case batch_read_number() of
         all_remaining ->
             {ok, qlc:eval(QH), undefined};
@@ -227,10 +204,10 @@ page_read(_, Topic, Page, Limit) ->
     QH =
         case Topic of
             undefined ->
-                search_table(undefined, ['#'], Now);
+                msg_table(search_table(undefined, ['#'], Now));
             _ ->
                 Tokens = topic_to_tokens(Topic),
-                search_table(Tokens, Now)
+                msg_table(search_table(Tokens, Now))
         end,
     OrderedQH = qlc:sort(QH, {order, fun compare_message/2}),
     Cursor = qlc:cursor(OrderedQH),
@@ -281,49 +258,49 @@ reindex_status() ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-store_retained(Indices, Msg, Tokens, ExpiryTime) ->
-    ok = store_retained_message(Msg, Tokens, ExpiryTime),
-    ok = emqx_retainer_index:foreach_index_key(
-        fun(Key) -> store_retained_index(Key, ExpiryTime) end,
-        Indices,
-        Tokens
-    ).
-
-store_retained_message(Msg, Tokens, ExpiryTime) ->
+do_store_retained(Msg, TopicTokens, ExpiryTime) ->
+    %% Retained message is stored syncronously on all core nodes
+    ok = do_store_retained_message(Msg, TopicTokens, ExpiryTime),
+    %% Since retained message was stored syncronously on all core nodes,
+    %% now we are sure that
+    %% * either we will write correct indices
+    %% * or if we a replicant with outdated write indices due to reindexing,
+    %%   the correct indices will be added by reindexing
+    ok = do_store_retained_indices(TopicTokens, ExpiryTime).
+
+do_store_retained_message(Msg, TopicTokens, ExpiryTime) ->
     RetainedMessage = #retained_message{
-        topic = Tokens,
+        topic = TopicTokens,
         msg = Msg,
         expiry_time = ExpiryTime
     },
-    mnesia:write(?TAB_MESSAGE, RetainedMessage, write).
+    ok = mria:dirty_write_sync(?TAB_MESSAGE, RetainedMessage).
 
-store_retained_index(Key, ExpiryTime) ->
+do_store_retained_indices(TopicTokens, ExpiryTime) ->
+    Indices = dirty_indices(write),
+    ok = emqx_retainer_index:foreach_index_key(
+        fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
+        Indices,
+        TopicTokens
+    ).
+
+do_store_retained_index(Key, ExpiryTime) ->
     RetainedIndex = #retained_index{
         key = Key,
         expiry_time = ExpiryTime
     },
-    mnesia:write(?TAB_INDEX, RetainedIndex, write).
-
-topic_search_table(Tokens) ->
-    Index = emqx_retainer_index:select_index(Tokens, db_indices(read)),
-    topic_search_table(Index, Tokens).
+    mria:dirty_write(?TAB_INDEX, RetainedIndex).
 
-topic_search_table(undefined, Tokens) ->
-    Cond = emqx_retainer_index:condition(Tokens),
-    Ms = [{#retained_message{topic = Cond, msg = '_', expiry_time = '_'}, [], ['$_']}],
-    MsgQH = mnesia:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]),
-    qlc:q([Topic || #retained_message{topic = Topic} <- MsgQH]);
-topic_search_table(Index, Tokens) ->
-    Cond = emqx_retainer_index:condition(Index, Tokens),
-    Ms = [{#retained_index{key = Cond, expiry_time = '_'}, [], ['$_']}],
-    IndexQH = mnesia:table(?TAB_INDEX, [{traverse, {select, Ms}}]),
+msg_table(SearchTable) ->
     qlc:q([
-        emqx_retainer_index:restore_topic(Key)
-     || #retained_index{key = Key} <- IndexQH
+        Msg
+     || #retained_message{
+            msg = Msg
+        } <- SearchTable
     ]).
 
 search_table(Tokens, Now) ->
-    Indices = dirty_read_indices(),
+    Indices = dirty_indices(read),
     Index = emqx_retainer_index:select_index(Tokens, Indices),
     search_table(Index, Tokens, Now).
 
@@ -341,26 +318,21 @@ search_table(Index, Tokens, Now) ->
      || TopicTokens <- Topics
     ]),
     qlc:q([
-        Msg
+        RetainedMsg
      || [
             #retained_message{
-                msg = Msg,
                 expiry_time = ExpiryTime
-            }
+            } = RetainedMsg
         ] <- RetainedMsgQH,
         (ExpiryTime == 0) or (ExpiryTime > Now)
     ]).
 
-dirty_read_indices() ->
-    case ets:lookup(?TAB_INDEX_META, ?META_KEY) of
-        [#retained_index_meta{read_indices = ReadIndices}] -> ReadIndices;
-        [] -> []
-    end.
-
 clear_batch(Indices, QC) ->
     {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
     lists:foreach(
-        fun(TopicTokens) -> delete_message_by_topic(TopicTokens, Indices) end,
+        fun(RetainedMsg) ->
+            delete_message_with_indices(RetainedMsg, Indices)
+        end,
         Rows
     ),
     case Result of
@@ -369,14 +341,23 @@ clear_batch(Indices, QC) ->
     end.
 
 delete_message_by_topic(TopicTokens, Indices) ->
+    case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of
+        [] -> ok;
+        [RetainedMsg] -> delete_message_with_indices(RetainedMsg, Indices)
+    end.
+
+delete_message_with_indices(RetainedMsg, Indices) ->
+    #retained_message{topic = TopicTokens, expiry_time = ExpiryTime} = RetainedMsg,
     ok = emqx_retainer_index:foreach_index_key(
         fun(Key) ->
-            mnesia:delete({?TAB_INDEX, Key})
+            mria:dirty_delete_object(?TAB_INDEX, #retained_index{
+                key = Key, expiry_time = ExpiryTime
+            })
         end,
         Indices,
         TopicTokens
     ),
-    ok = mnesia:delete({?TAB_MESSAGE, TopicTokens}).
+    ok = mria:dirty_delete_object(?TAB_MESSAGE, RetainedMsg).
 
 compare_message(M1, M2) ->
     M1#message.timestamp =< M2#message.timestamp.
@@ -415,20 +396,26 @@ qlc_next_answers(QC, N) ->
 
 make_message_match_spec(Tokens, NowMs) ->
     Cond = emqx_retainer_index:condition(Tokens),
-    MsHd = #retained_message{topic = Cond, msg = '$2', expiry_time = '$3'},
-    [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$2']}].
+    MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},
+    [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
 
 make_index_match_spec(Index, Tokens, NowMs) ->
     Cond = emqx_retainer_index:condition(Index, Tokens),
     MsHd = #retained_index{key = Cond, expiry_time = '$3'},
     [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
 
--spec is_table_full() -> boolean().
 is_table_full() ->
     Limit = emqx:get_config([retainer, backend, max_retained_messages]),
     Limit > 0 andalso (table_size() >= Limit).
 
--spec table_size() -> non_neg_integer().
+is_new_topic(Tokens) ->
+    case mnesia:dirty_read(?TAB_MESSAGE, Tokens) of
+        [_] ->
+            false;
+        [] ->
+            true
+    end.
+
 table_size() ->
     mnesia:table_info(?TAB_MESSAGE, size).
 
@@ -486,8 +473,14 @@ do_populate_index_meta(ConfigIndices) ->
             )
     end.
 
+dirty_indices(Type) ->
+    indices(ets:lookup(?TAB_INDEX_META, ?META_KEY), Type).
+
 db_indices(Type) ->
-    case mnesia:read(?TAB_INDEX_META, ?META_KEY) of
+    indices(mnesia:read(?TAB_INDEX_META, ?META_KEY), Type).
+
+indices(IndexRecords, Type) ->
+    case IndexRecords of
         [#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] ->
             case Type of
                 read -> ReadIndices;
@@ -506,10 +499,15 @@ batch_read_number() ->
 reindex(NewIndices, Force, StatusFun) when
     is_boolean(Force) andalso is_function(StatusFun, 1)
 ->
+    %% Do not run on replicants
+    core = mria_rlog:role(),
     %% Disable read indices and update write indices so that new records are written
     %% with correct indices. Also block parallel reindexing.
     case try_start_reindex(NewIndices, Force) of
         {atomic, ok} ->
+            %% Wait for all nodes to have new indices, including rlog nodes
+            true = wait_indices_updated({[], NewIndices}, ?REINDEX_INDEX_UPDATE_WAIT),
+
             %% Wait for all dispatch operations to be completed to avoid
             %% inconsistent results.
             true = wait_dispatch_complete(?REINDEX_DISPATCH_WAIT),
@@ -592,7 +590,7 @@ reindex_topic(Indices, Topic) ->
     case mnesia:read(?TAB_MESSAGE, Topic, read) of
         [#retained_message{expiry_time = ExpiryTime}] ->
             ok = emqx_retainer_index:foreach_index_key(
-                fun(Key) -> store_retained_index(Key, ExpiryTime) end,
+                fun(Key) -> do_store_retained_index(Key, ExpiryTime) end,
                 Indices,
                 Topic
             );
@@ -627,8 +625,35 @@ do_reindex_batch(QC, Done) ->
 
 wait_dispatch_complete(Timeout) ->
     Nodes = mria_mnesia:running_nodes(),
-    {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout),
+    {Results, []} = emqx_retainer_proto_v2:wait_dispatch_complete(Nodes, Timeout),
     lists:all(
         fun(Result) -> Result =:= ok end,
         Results
     ).
+
+wait_indices_updated(_Indices, TimeLeft) when TimeLeft < 0 -> false;
+wait_indices_updated(Indices, TimeLeft) ->
+    case timer:tc(fun() -> are_indices_updated(Indices) end) of
+        {_, true} ->
+            true;
+        {TimePassed, false} ->
+            timer:sleep(?REINDEX_RPC_RETRY_INTERVAL),
+            wait_indices_updated(
+                Indices, TimeLeft - ?REINDEX_RPC_RETRY_INTERVAL - TimePassed / 1000
+            )
+    end.
+
+active_indices() ->
+    {dirty_indices(read), dirty_indices(write)}.
+
+are_indices_updated(Indices) ->
+    Nodes = mria_mnesia:running_nodes(),
+    case emqx_retainer_proto_v2:active_mnesia_indices(Nodes) of
+        {Results, []} ->
+            lists:all(
+                fun(NodeIndices) -> NodeIndices =:= Indices end,
+                Results
+            );
+        _ ->
+            false
+    end.

+ 33 - 21
apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl

@@ -50,26 +50,17 @@ retainer(["reindex", "status"]) ->
 retainer(["reindex", "start"]) ->
     retainer(["reindex", "start", "false"]);
 retainer(["reindex", "start", ForceParam]) ->
-    Force =
-        case ForceParam of
-            "true" -> true;
-            _ -> false
-        end,
-    ?PRINT_MSG("Starting reindexing~n"),
-    emqx_retainer_mnesia:reindex(
-        Force,
-        fun(Done) ->
-            ?SLOG(
-                info,
-                #{
-                    msg => "retainer_message_record_reindexing_progress",
-                    done => Done
-                }
-            ),
-            ?PRINT("Reindexed ~p messages~n", [Done])
-        end
-    ),
-    ?PRINT_MSG("Reindexing finished~n");
+    case mria_rlog:role() of
+        core ->
+            Force =
+                case ForceParam of
+                    "true" -> true;
+                    _ -> false
+                end,
+            do_reindex(Force);
+        replicant ->
+            ?PRINT_MSG("Can't run reindex on a replicant node")
+    end;
 retainer(_) ->
     emqx_ctl:usage(
         [
@@ -79,10 +70,31 @@ retainer(_) ->
             {"retainer clean <Topic>", "Clean retained messages by the specified topic filter"},
             {"retainer reindex status", "Show reindex status"},
             {"retainer reindex start [force]",
-                "Generate new retainer topic indices config settings.\n"
+                "Generate new retainer topic indices from config settings.\n"
                 "Pass true as <Force> to ignore previously started reindexing"}
         ]
     ).
 
 unload() ->
     ok = emqx_ctl:unregister_command(retainer).
+
+%%------------------------------------------------------------------------------
+%% Private
+%%------------------------------------------------------------------------------
+
+do_reindex(Force) ->
+    ?PRINT_MSG("Starting reindexing~n"),
+    emqx_retainer_mnesia:reindex(
+        Force,
+        fun(Done) ->
+            ?SLOG(
+                info,
+                #{
+                    msg => "retainer_message_record_reindexing_progress",
+                    done => Done
+                }
+            ),
+            ?PRINT("Reindexed ~p messages~n", [Done])
+        end
+    ),
+    ?PRINT_MSG("Reindexing finished~n").

+ 41 - 0
apps/emqx_retainer/src/proto/emqx_retainer_proto_v2.erl

@@ -0,0 +1,41 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_retainer_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-export([
+    introduced_in/0,
+    wait_dispatch_complete/2,
+    active_mnesia_indices/1
+]).
+
+-define(TIMEOUT, 5000).
+
+introduced_in() ->
+    "5.0.13".
+
+-spec wait_dispatch_complete(list(node()), timeout()) -> emqx_rpc:multicall_result(ok).
+wait_dispatch_complete(Nodes, Timeout) ->
+    rpc:multicall(Nodes, emqx_retainer_dispatcher, wait_dispatch_complete, [Timeout]).
+
+-spec active_mnesia_indices(list(node())) ->
+    emqx_rpc:multicall_result({list(emqx_retainer_index:index()), list(emqx_retainer_index:index())}).
+active_mnesia_indices(Nodes) ->
+    rpc:multicall(Nodes, emqx_retainer_mnesia, active_indices, [], ?TIMEOUT).

+ 19 - 0
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -318,6 +318,25 @@ t_message_expiry_2(_) ->
     end,
     with_conf(ConfMod, Case).
 
+t_table_full(_) ->
+    ConfMod = fun(Conf) ->
+        Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}}
+    end,
+    Case = fun() ->
+        {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+        {ok, _} = emqtt:connect(C1),
+        emqtt:publish(C1, <<"retained/t/1">>, <<"a">>, [{qos, 0}, {retain, true}]),
+        emqtt:publish(C1, <<"retained/t/2">>, <<"b">>, [{qos, 0}, {retain, true}]),
+
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/t/1">>, [{qos, 0}, {rh, 0}]),
+        ?assertEqual(1, length(receive_messages(1))),
+        {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/t/2">>, [{qos, 0}, {rh, 0}]),
+        ?assertEqual(0, length(receive_messages(1))),
+
+        ok = emqtt:disconnect(C1)
+    end,
+    with_conf(ConfMod, Case).
+
 t_clean(_) ->
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),

+ 11 - 0
changes/v5.0.12-en.md

@@ -24,6 +24,8 @@ Please note, the request body of `/bridges` API to configure MQTT brdige is chan
 
 - Add more PSK ciphers support [#9505](https://github.com/emqx/emqx/pull/9505).
 
+- Improve `emqx_retainer` write performance: get rid of transactions on write [#9372](https://github.com/emqx/emqx/pull/9372).
+
 - Upgrade dashboard to [v1.1.3](https://github.com/emqx/emqx-dashboard-web-new/releases/tag/v1.1.3).
 
 ## Bug fixes
@@ -35,3 +37,12 @@ Please note, the request body of `/bridges` API to configure MQTT brdige is chan
 - Return `404` for `/telemetry/data` in case it's disabled [#9464](https://github.com/emqx/emqx/pull/9464).
 
 - Fix some potential MQTT packet parse errors [#9477](https://github.com/emqx/emqx/pull/9477).
+
+- Fixed EMQX Helm Chart deployment error [#9509](https://github.com/emqx/emqx/pull/9509)
+
+  - Fixed the `Discovery error: no such service` error occurred during helm chart deployment, resulting in an abnormal discovery of cluster nodes.
+
+  - Fixed that caused EMQX Helm Chart to fail when modifying some of EMQX's configuration items via environment variables
+
+- Fix shadowing `'client.authenticate'` callbacks by `emqx_authenticator`. Now `emqx_authenticator`
+  passes execution to the further callbacks if none of the authenticators matches [#9496](https://github.com/emqx/emqx/pull/9496).

+ 0 - 29
changes/v5.0.12-en.md.orig

@@ -1,29 +0,0 @@
-# v5.0.12
-
-## Enhancements
-
-- Disable global garbage collection by `node.global_gc_interval = disabled` [#9418](https://github.com/emqx/emqx/pull/9418)。
-
-- Improve the CLI to avoid waste atom table when typing erros [#9416](https://github.com/emqx/emqx/pull/9416).
-
-- Start building MacOS packages for Apple Silicon hadrdware [#9423](https://github.com/emqx/emqx/pull/9423).
-
-- Remove support for setting shared subscriptions using the non-standard `$queue` feature [#9412](https://github.com/emqx/emqx/pull/9412).
-  Shared subscriptions are now part of the MQTT spec. Use `$share` instead.
-
-- Refactor authn API by replacing `POST /authentication/{id}/move` with `PUT /authentication/{id}/position/{position}`. [#9419](https://github.com/emqx/emqx/pull/9419).
-  Same is done for `/listeners/{listener_id}/authentication/id/...`.
-
-- Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/).
-
-## Bug fixes
-
-- Fix that the obsolete SSL files aren't deleted after the ExHook config update [#9432](https://github.com/emqx/emqx/pull/9432).
-
-- Fix doc and schema for `/trace` API [#9468](https://github.com/emqx/emqx/pull/9468).
-
-<<<<<<< HEAD
-- Return `404` for `/telemetry/data` in case it's disabled [#9464](https://github.com/emqx/emqx/pull/9464).
-=======
-- Fix some potential MQTT packet parse errors [#9477](https://github.com/emqx/emqx/pull/9477).
->>>>>>> 030a07d8e (fix(frame): fix potential parse errors found by fuzzing test)

+ 10 - 0
changes/v5.0.12-zh.md

@@ -23,6 +23,8 @@ v5.0.11 或更早版本创建的配置文件,在新版本中会被自动转换
 
 - 支持更多的 PSK 密码套件[#9505](https://github.com/emqx/emqx/pull/9505)。
 
+- 提高 `emqx_retainer` 写入性能:摆脱写入时的事务 [#9372](https://github.com/emqx/emqx/pull/9372)。
+
 - Dashboard 更新到 [v1.1.3](https://github.com/emqx/emqx-dashboard-web-new/releases/tag/v1.1.3).
 
 ## 修复
@@ -34,3 +36,11 @@ v5.0.11 或更早版本创建的配置文件,在新版本中会被自动转换
 - 在遥测功能未开启时,通过 /telemetry/data 请求其数据,将会返回 404 [#9464](https://github.com/emqx/emqx/pull/9464)。
 
 - 修复了一些 MQTT 协议包的潜在解析错误 [#9477](https://github.com/emqx/emqx/pull/9477)。
+
+- 修复了 EMQX Helm Chart 部署的一些问题 [#9509](https://github.com/emqx/emqx/pull/9509)
+
+  - 修复了 EMQX Helm Chart 部署时出现 `Discovery error: no such service` 错误,导致集群节点发现异常。
+
+  - 修复了 EMQX Helm Chart 通过环境变量修改部分 EMQX 的配置项时的错误
+
+- 通过 `emqx_authenticator` 修复隐藏 `'client.authenticate'` 回调。 现在 `emqx_authenticator` 如果没有任何验证器匹配,则将执行传递给进一步的回调 [#9496](https://github.com/emqx/emqx/pull/9496)。

+ 0 - 28
changes/v5.0.12-zh.md.orig

@@ -1,28 +0,0 @@
-# v5.0.12
-
-## 增强
-
-- 通过 `node.global_gc_interval = disabled` 来禁用全局垃圾回收 [#9418](https://github.com/emqx/emqx/pull/9418)。
-
-- 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)。
-
-- 优化命令行实现, 避免输入错误指令时, 产生不必要的原子表消耗 [#9416](https://github.com/emqx/emqx/pull/9416)。
-
-- 支持在 Apple Silicon 架构下编译苹果系统的发行版本 [#9423](https://github.com/emqx/emqx/pull/9423)。
-
-- 删除了老的共享订阅支持方式, 不再使用 `$queue` 前缀 [#9412](https://github.com/emqx/emqx/pull/9412)。
-  共享订阅自 MQTT v5.0 开始已成为协议标准,可以使用 `$share` 前缀代替 `$queue`。
-
-- 重构认证 API,使用 `PUT /authentication/{id}/position/{position}` 代替了 `POST /authentication/{id}/move` [#9419](https://github.com/emqx/emqx/pull/9419)。
-
-## 修复
-
-- 修复 ExHook 更新 SSL 相关配置后,过时的 SSL 文件没有被删除的问题 [#9432](https://github.com/emqx/emqx/pull/9432)。
-
-- 修复 /trace API 的返回值格式和相关文档 [#9468](https://github.com/emqx/emqx/pull/9468)。
-
-<<<<<<< HEAD
-- 在遥测功能未开启时,通过 /telemetry/data 请求其数据,将会返回 404 [#9464](https://github.com/emqx/emqx/pull/9464)。
-=======
-- 修复了一些 MQTT 协议包的潜在解析错误 [#9477](https://github.com/emqx/emqx/pull/9477)。
->>>>>>> 030a07d8e (fix(frame): fix potential parse errors found by fuzzing test)

+ 15 - 0
deploy/charts/emqx-enterprise/templates/configmap.yaml

@@ -10,10 +10,25 @@ metadata:
     app.kubernetes.io/instance: {{ .Release.Name }}
     app.kubernetes.io/managed-by: {{ .Release.Service }}
 data:
+  EMQX_NAME:  {{ .Release.Name }}
+  {{- if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY)  "k8s"  }}
+  EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443"
+  EMQX_CLUSTER__K8S__SERVICE_NAME:  {{ include "emqx.fullname" . }}-headless
+  EMQX_CLUSTER__K8S__NAMESPACE: {{ .Release.Namespace }}
+  EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname"
+  EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local"
+  {{- else if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY)  "dns"  }}
+  EMQX_CLUSTER__DNS__NAME: "{{  include "emqx.fullname" . }}-headless.{{ .Release.Namespace }}.svc.cluster.local"
+  EMQX_CLUSTER__DNS__RECORD_TYPE: "srv"
+  {{- end -}}
   {{- range $index, $value := .Values.emqxConfig }}
   {{- if $value }}
   {{- $key := (regexReplaceAllLiteral "\\." (regexReplaceAllLiteral "EMQX[_\\.]" (upper (trimAll " " $index)) "") "__") }}
+  {{- if or (kindIs "map" $value) (kindIs "slice" $value) }}
+  {{ print "EMQX_" $key }}: {{ tpl (printf "%q" (toJson $value)) $ }}
+  {{- else }}
   {{ print "EMQX_" $key }}: "{{ tpl (printf "%v" $value) $ }}"
   {{- end }}
   {{- end }}
+  {{- end }}
 {{- end }}

+ 2 - 13
deploy/charts/emqx-enterprise/values.yaml

@@ -7,6 +7,8 @@ replicaCount: 3
 image:
   repository: emqx/emqx-enterprise
   pullPolicy: IfNotPresent
+  # Overrides the image tag whose default is the chart appVersion.
+  tag: ""
   ## Optionally specify an array of imagePullSecrets.
   ## Secrets must be manually created in the namespace.
   ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
@@ -92,19 +94,6 @@ initContainers: {}
 ## EMQX configuration item, see the documentation (https://hub.docker.com/r/emqx/emqx)
 emqxConfig:
   EMQX_CLUSTER__DISCOVERY_STRATEGY: "dns"
-  EMQX_CLUSTER__DNS__NAME: "{{ .Release.Name }}-headless.{{ .Release.Namespace }}.svc.cluster.local"
-  EMQX_CLUSTER__DNS__RECORD_TYPE: "srv"
-  # EMQX_CLUSTER__DISCOVERY_STRATEGY: "k8s"
-  # EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443"
-  # EMQX_CLUSTER__K8S__SERVICE_NAME: "{{ .Release.Name }}-headless"
-  # EMQX_CLUSTER__K8S__NAMESPACE: "{{ .Release.Namespace }}"
-  ## The address type is used to extract host from k8s service.
-  ## Value: ip | dns | hostname
-  ## Note:Hostname is only supported after v4.0-rc.2
-  EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname"
-  EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local"
-  ## if EMQX_CLUSTER__K8S__ADDRESS_TYPE eq dns
-  # EMQX_CLUSTER__K8S__SUFFIX: "pod.cluster.local"
   EMQX_DASHBOARD__DEFAULT_USERNAME: "admin"
   EMQX_DASHBOARD__DEFAULT_PASSWORD: "public"
 

+ 15 - 0
deploy/charts/emqx/templates/configmap.yaml

@@ -10,10 +10,25 @@ metadata:
     app.kubernetes.io/instance: {{ .Release.Name }}
     app.kubernetes.io/managed-by: {{ .Release.Service }}
 data:
+  EMQX_NAME:  {{ .Release.Name }}
+  {{- if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY)  "k8s"  }}
+  EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443"
+  EMQX_CLUSTER__K8S__SERVICE_NAME:  {{ include "emqx.fullname" . }}-headless
+  EMQX_CLUSTER__K8S__NAMESPACE: {{ .Release.Namespace }}
+  EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname"
+  EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local"
+  {{- else if eq (.Values.emqxConfig.EMQX_CLUSTER__DISCOVERY_STRATEGY)  "dns"  }}
+  EMQX_CLUSTER__DNS__NAME: "{{  include "emqx.fullname" . }}-headless.{{ .Release.Namespace }}.svc.cluster.local"
+  EMQX_CLUSTER__DNS__RECORD_TYPE: "srv"
+  {{- end -}}
   {{- range $index, $value := .Values.emqxConfig }}
   {{- if $value }}
   {{- $key := (regexReplaceAllLiteral "\\." (regexReplaceAllLiteral "EMQX[_\\.]" (upper (trimAll " " $index)) "") "__") }}
+  {{- if or (kindIs "map" $value) (kindIs "slice" $value) }}
+  {{ print "EMQX_" $key }}: {{ tpl (printf "%q" (toJson $value)) $ }}
+  {{- else }}
   {{ print "EMQX_" $key }}: "{{ tpl (printf "%v" $value) $ }}"
   {{- end }}
   {{- end }}
+  {{- end }}
 {{- end }}

+ 0 - 13
deploy/charts/emqx/values.yaml

@@ -94,19 +94,6 @@ initContainers: {}
 ## EMQX configuration item, see the documentation (https://hub.docker.com/r/emqx/emqx)
 emqxConfig:
   EMQX_CLUSTER__DISCOVERY_STRATEGY: "dns"
-  EMQX_CLUSTER__DNS__NAME: "{{ .Release.Name }}-headless.{{ .Release.Namespace }}.svc.cluster.local"
-  EMQX_CLUSTER__DNS__RECORD_TYPE: "srv"
-  # EMQX_CLUSTER__DISCOVERY_STRATEGY: "k8s"
-  # EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443"
-  # EMQX_CLUSTER__K8S__SERVICE_NAME: "{{ .Release.Name }}-headless"
-  # EMQX_CLUSTER__K8S__NAMESPACE: "{{ .Release.Namespace }}"
-  ## The address type is used to extract host from k8s service.
-  ## Value: ip | dns | hostname
-  ## Note:Hostname is only supported after v4.0-rc.2
-  EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname"
-  EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local"
-  ## if EMQX_CLUSTER__K8S__ADDRESS_TYPE eq dns
-  # EMQX_CLUSTER__K8S__SUFFIX: "pod.cluster.local"
   EMQX_DASHBOARD__DEFAULT_USERNAME: "admin"
   EMQX_DASHBOARD__DEFAULT_PASSWORD: "public"
 

+ 1 - 1
mix.exs

@@ -52,7 +52,7 @@ defmodule EMQXUmbrella.MixProject do
       {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
       {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true},
-      {:ekka, github: "emqx/ekka", tag: "0.13.6", override: true},
+      {:ekka, github: "emqx/ekka", tag: "0.13.7", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},

+ 1 - 1
rebar.config

@@ -54,7 +54,7 @@
     , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}
-    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.7"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}