Explorar o código

feat(sessds): implement dispatchig between CE/EE shared sub agents

Ilya Averyanov hai 1 ano
pai
achega
b075b7120c

+ 14 - 10
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -367,7 +367,7 @@ print_session(ClientId) ->
 %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
 %%--------------------------------------------------------------------
 
-%% Suppress warnings about clauses handling unimplemented reuslts
+%% Suppress warnings about clauses handling unimplemented results
 %% of `emqx_persistent_session_ds_shared_subs:on_subscribe/3`
 -dialyzer({nowarn_function, subscribe/3}).
 -spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
@@ -378,9 +378,9 @@ subscribe(
     Session
 ) ->
     case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of
-        {ok, S1} ->
-            S = emqx_persistent_session_ds_state:commit(S1),
-            {ok, Session#{s => S}};
+        {ok, S0, SharedSubS} ->
+            S = emqx_persistent_session_ds_state:commit(S0),
+            {ok, Session#{s => S, shared_sub_s => SharedSubS}};
         Error = {error, _} ->
             Error
     end;
@@ -397,20 +397,24 @@ subscribe(
             Error
     end.
 
-%% Suppress warnings about clauses handling unimplemented reuslts
-%% of `emqx_persistent_session_ds_shared_subs:on_subscribe/4`
+%% Suppress warnings about clauses handling unimplemented results
+%% of `emqx_persistent_session_ds_shared_subs:on_unsubscribe/4`
 -dialyzer({nowarn_function, unsubscribe/2}).
 -spec unsubscribe(topic_filter(), session()) ->
     {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
 unsubscribe(
     #share{} = TopicFilter,
-    Session = #{id := SessionId, s := S0}
+    Session = #{id := SessionId, s := S0, shared_sub_s := SharedSubS0}
 ) ->
-    case emqx_persistent_session_ds_shared_subs:on_unsubscribe(SessionId, TopicFilter, S0) of
-        {ok, S1, #{id := SubId, subopts := SubOpts}} ->
+    case
+        emqx_persistent_session_ds_shared_subs:on_unsubscribe(
+            SessionId, TopicFilter, S0, SharedSubS0
+        )
+    of
+        {ok, S1, SharedSubS1, #{id := SubId, subopts := SubOpts}} ->
             S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
             S = emqx_persistent_session_ds_state:commit(S2),
-            {ok, Session#{s => S}, SubOpts};
+            {ok, Session#{s => S, shared_sub_s => SharedSubS1}, SubOpts};
         Error = {error, _} ->
             Error
     end;

+ 28 - 17
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -4,7 +4,8 @@
 
 -module(emqx_persistent_session_ds_shared_subs).
 
--include_lib("emqx_mqtt.hrl").
+-include("emqx_mqtt.hrl").
+-include("logger.hrl").
 -include("session_internals.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
@@ -46,7 +47,7 @@
 %% API
 %%--------------------------------------------------------------------
 
--spec new(emqx_persistent_session_ds:shared_sub_opts()) -> t().
+-spec new(opts()) -> t().
 new(Opts) ->
     #{
         agent => emqx_persistent_session_ds_shared_subs_agent:new(
@@ -54,7 +55,7 @@ new(Opts) ->
         )
     }.
 
--spec open(emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:shared_sub_opts()) ->
+-spec open(emqx_persistent_session_ds_state:t(), opts()) ->
     {ok, emqx_persistent_session_ds_state:t(), t()}.
 open(S, Opts) ->
     SharedSubscriptions = fold_shared_subs(
@@ -71,10 +72,9 @@ open(S, Opts) ->
     {ok, S, SharedSubS}.
 
 -spec on_subscribe(
-    emqx_persistent_session_ds_state:t(),
-    t(),
     share_topic_filter(),
-    emqx_types:subopts()
+    emqx_types:subopts(),
+    emqx_persistent_session_ds:session()
 ) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
 on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
     Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
@@ -110,6 +110,10 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
     {NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
         Agent0
     ),
+    NewLeasedStreams =/= [] andalso
+        ?SLOG(
+            info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams}
+        ),
     S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams),
     S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams),
     SharedSubS1 = SharedSubS0#{agent => Agent1},
@@ -118,7 +122,7 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
 -spec on_streams_replayed(
     emqx_persistent_session_ds_state:t(),
     t()
-) -> t().
+) -> {emqx_persistent_session_ds_state:t(), t()}.
 on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
     %% TODO
     %% Is it sufficient for a report?
@@ -208,6 +212,7 @@ on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Sessio
 on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
     update_subscription(Subscription, TopicFilter, SubOpts, Session).
 
+-dialyzer({nowarn_function, create_new_subscription/3}).
 create_new_subscription(TopicFilter, SubOpts, #{
     id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
 }) ->
@@ -286,17 +291,22 @@ accept_stream(
             %% and should not have passed this stream as a new one
             error(new_stream_without_sub);
         #{id := SubId, current_state := SStateId} ->
-            NewSRS =
-                #srs{
-                    rank_x = ?rank_x,
-                    rank_y = ?rank_y,
-                    it_begin = Iterator,
-                    it_end = Iterator,
-                    sub_state_id = SStateId
-                },
             Key = {SubId, Stream},
-            S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
-            S1
+            case emqx_persistent_session_ds_state:get_stream(Key, S0) of
+                undefined ->
+                    NewSRS =
+                        #srs{
+                            rank_x = ?rank_x,
+                            rank_y = ?rank_y,
+                            it_begin = Iterator,
+                            it_end = Iterator,
+                            sub_state_id = SStateId
+                        },
+                    S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
+                    S1;
+                _SRS ->
+                    S0
+            end
     end.
 
 revoke_stream(
@@ -364,5 +374,6 @@ send_after_from_agent(SendAfter, Time, Dest, Msg) ->
             SendAfter(Time, Dest, Msg)
     end.
 
+-dialyzer({nowarn_function, now_ms/0}).
 now_ms() ->
     erlang:system_time(millisecond).

+ 28 - 20
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl

@@ -4,6 +4,8 @@
 
 -module(emqx_persistent_session_ds_shared_subs_agent).
 
+-include("shared_subs_agent.hrl").
+
 -type session_id() :: emqx_persistent_session_ds:id().
 
 -type subscription() :: #{
@@ -57,48 +59,54 @@
 
     on_subscribe/3,
     on_unsubscribe/2,
-    on_session_drop/1,
     on_stream_progress/2,
     on_info/2,
 
     renew_streams/1
 ]).
 
+%%--------------------------------------------------------------------
+%% Behaviour
+%%--------------------------------------------------------------------
+
+-callback new(opts()) -> t().
+-callback open([{topic_filter(), subscription()}], opts()) -> t().
+-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
+    {ok, t()} | {error, term()}.
+-callback on_unsubscribe(t(), topic_filter()) -> t().
+-callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}.
+-callback on_stream_progress(t(), [stream_progress()]) -> t().
+-callback on_info(t(), term()) -> t().
 
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
 
 -spec new(opts()) -> t().
-new(_Opts) ->
-    undefined.
+new(Opts) ->
+    ?shared_subs_agent:new(Opts).
 
 -spec open([{topic_filter(), subscription()}], opts()) -> t().
-open(_Topics, _Opts) ->
-    undefined.
+open(Topics, Opts) ->
+    ?shared_subs_agent:open(Topics, Opts).
 
 -spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
-    {ok, t()} | {error, term()}.
-on_subscribe(Agent, _TopicFilter, _SubOpts) ->
-    % {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}
-    {ok, Agent}.
+    {ok, t()} | {error, emqx_types:reason_code()}.
+on_subscribe(Agent, TopicFilter, SubOpts) ->
+    ?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
 
 -spec on_unsubscribe(t(), topic_filter()) -> t().
-on_unsubscribe(Agent, _TopicFilter) ->
-    Agent.
-
--spec on_session_drop(t()) -> t().
-on_session_drop(Agent) ->
-    Agent.
+on_unsubscribe(Agent, TopicFilter) ->
+    ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
 
 -spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}.
 renew_streams(Agent) ->
-    {[], [], Agent}.
+    ?shared_subs_agent:renew_streams(Agent).
 
 -spec on_stream_progress(t(), [stream_progress()]) -> t().
-on_stream_progress(Agent, _StreamProgress) ->
-    Agent.
+on_stream_progress(Agent, StreamProgress) ->
+    ?shared_subs_agent:on_stream_progress(Agent, StreamProgress).
 
 -spec on_info(t(), term()) -> t().
-on_info(Agent, _Info) ->
-    Agent.
+on_info(Agent, Info) ->
+    ?shared_subs_agent:on_info(Agent, Info).

+ 46 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds_shared_subs_null_agent).
+
+-include("emqx_mqtt.hrl").
+
+-export([
+    new/1,
+    open/2,
+
+    on_subscribe/3,
+    on_unsubscribe/2,
+    on_stream_progress/2,
+    on_info/2,
+
+    renew_streams/1
+]).
+
+-behaviour(emqx_persistent_session_ds_shared_subs_agent).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+new(_Opts) ->
+    undefined.
+
+open(_Topics, _Opts) ->
+    undefined.
+
+on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
+    {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
+
+on_unsubscribe(Agent, _TopicFilter) ->
+    Agent.
+
+renew_streams(Agent) ->
+    {[], [], Agent}.
+
+on_stream_progress(Agent, _StreamProgress) ->
+    Agent.
+
+on_info(Agent, _Info) ->
+    Agent.

+ 30 - 31
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl

@@ -16,7 +16,7 @@
 -module(emqx_persistent_session_ds_stream_scheduler).
 
 %% API:
--export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2, shuffle/1]).
+-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]).
 -export([renew_streams/1, on_unsubscribe/2]).
 
 %% behavior callbacks:
@@ -87,20 +87,22 @@ find_new_streams(S) ->
     %% after timeout?)
     Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
     Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
-    emqx_persistent_session_ds_state:fold_streams(
-        fun
-            (_Key, #srs{it_end = end_of_stream}, Acc) ->
-                Acc;
-            (Key, Stream, Acc) ->
-                case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of
-                    true ->
-                        [{Key, Stream} | Acc];
-                    false ->
-                        Acc
-                end
-        end,
-        [],
-        S
+    shuffle(
+        emqx_persistent_session_ds_state:fold_streams(
+            fun
+                (_Key, #srs{it_end = end_of_stream}, Acc) ->
+                    Acc;
+                (Key, Stream, Acc) ->
+                    case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of
+                        true ->
+                            [{Key, Stream} | Acc];
+                        false ->
+                            Acc
+                    end
+            end,
+            [],
+            S
+        )
     ).
 
 %% @doc This function makes the session aware of the new streams.
@@ -201,19 +203,6 @@ is_fully_acked(Srs, S) ->
     CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
     is_fully_acked(CommQos1, CommQos2, Srs).
 
--spec shuffle([A]) -> [A].
-shuffle(L0) ->
-    L1 = lists:map(
-        fun(A) ->
-            %% maybe topic/stream prioritization could be introduced here?
-            {rand:uniform(), A}
-        end,
-        L0
-    ),
-    L2 = lists:sort(L1),
-    {_, L} = lists:unzip(L2),
-    L.
-
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -222,9 +211,6 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream
     Key = {SubId, Stream},
     case emqx_persistent_session_ds_state:get_stream(Key, S) of
         undefined ->
-            ?SLOG(debug, #{
-                msg => new_stream, key => Key, stream => Stream
-            }),
             case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of
                 {ok, Iterator} ->
                     NewStreamState = #srs{
@@ -424,6 +410,19 @@ is_fully_acked(_, _, #srs{
 is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
     (Comm1 >= S1) andalso (Comm2 >= S2).
 
+-spec shuffle([A]) -> [A].
+shuffle(L0) ->
+    L1 = lists:map(
+        fun(A) ->
+            %% maybe topic/stream prioritization could be introduced here?
+            {rand:uniform(), A}
+        end,
+        L0
+    ),
+    L2 = lists:sort(L1),
+    {_, L} = lists:unzip(L2),
+    L.
+
 fold_proper_subscriptions(Fun, Acc, S) ->
     emqx_persistent_session_ds_state:fold_subscriptions(
         fun

+ 4 - 1
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl

@@ -172,7 +172,10 @@ on_session_drop(SessionId, S0) ->
     ok.
 
 %% @doc Remove subscription states that don't have a parent, and that
-%% don't have any unacked messages:
+%% don't have any unacked messages.
+%% TODO
+%% This function collects shared subs as well
+%% Move to a separate module to keep symmetry?
 -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
 gc(S0) ->
     %% Create a set of subscription states IDs referenced either by a

+ 34 - 0
apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl

@@ -0,0 +1,34 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-ifndef(SHARED_SUBS_AGENT_HRL).
+-define(SHARED_SUBS_AGENT_HRL, true).
+
+-ifdef(EMQX_RELEASE_EDITION).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+
+%% agent from BSL app
+% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
+%% Till full implementation we need to dispach to the null agent.
+%% It will report "not implemented" error for attempts to use shared subscriptions.
+-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
+
+%% -if(?EMQX_RELEASE_EDITION == ee).
+-else.
+
+-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
+
+%% -if(?EMQX_RELEASE_EDITION == ee).
+-endif.
+
+%% -ifdef(EMQX_RELEASE_EDITION).
+-else.
+
+-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
+
+%% -ifdef(EMQX_RELEASE_EDITION).
+-endif.
+
+-endif.

+ 156 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl

@@ -0,0 +1,156 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_agent).
+
+-include_lib("emqx/include/emqx_persistent_message.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+-export([
+    new/1,
+    open/2,
+
+    on_subscribe/3,
+    on_unsubscribe/2,
+    on_stream_progress/2,
+    on_info/2,
+
+    renew_streams/1
+]).
+
+-behaviour(emqx_persistent_session_ds_shared_subs_agent).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+new(Opts) ->
+    init_state(Opts).
+
+open(TopicSubscriptions, Opts) ->
+    State0 = init_state(Opts),
+    State1 = lists:foldl(
+        fun({ShareTopicFilter, #{start_time := StartTime}}, State) ->
+            add_subscription(State, ShareTopicFilter, StartTime)
+        end,
+        State0,
+        TopicSubscriptions
+    ),
+    State1.
+
+on_subscribe(State0, TopicFilter, _SubOpts) ->
+    StartTime = now_ms(),
+    State1 = add_subscription(State0, TopicFilter, StartTime),
+    {ok, State1}.
+
+on_unsubscribe(State, TopicFilter) ->
+    delete_subscription(State, TopicFilter).
+
+renew_streams(State0) ->
+    State1 = do_renew_streams(State0),
+    {State2, StreamLeases} = stream_leases(State1),
+    {StreamLeases, [], State2}.
+
+on_stream_progress(State, _StreamProgress) ->
+    State.
+
+on_info(State, _Info) ->
+    State.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+init_state(Opts) ->
+    SessionId = maps:get(session_id, Opts),
+    SendFuns = maps:get(send_funs, Opts),
+    Send = maps:get(send, SendFuns),
+    SendAfter = maps:get(send_after, SendFuns),
+    #{
+        session_id => SessionId,
+        send => Send,
+        end_after => SendAfter,
+        subscriptions => #{}
+    }.
+
+% send(State, Pid, Msg) ->
+%     Send = maps:get(send, State),
+%     Send(Pid, Msg).
+
+% send_after(State, Time, Pid, Msg) ->
+%     SendAfter = maps:get(send_after, State),
+%     SendAfter(Time, Pid, Msg).
+
+do_renew_streams(#{subscriptions := Subs0} = State0) ->
+    Subs1 = maps:map(
+        fun(
+            ShareTopicFilter,
+            #{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub
+        ) ->
+            #share{topic = TopicFilterRaw} = ShareTopicFilter,
+            TopicFilter = emqx_topic:words(TopicFilterRaw),
+            {_, NewStreams} = lists:unzip(
+                emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime)
+            ),
+            {Streams1, NewLeases} = lists:foldl(
+                fun(Stream, {StreamsAcc, LeasesAcc}) ->
+                    case StreamsAcc of
+                        #{Stream := _} ->
+                            {StreamsAcc, LeasesAcc};
+                        _ ->
+                            {ok, It} = emqx_ds:make_iterator(
+                                ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
+                            ),
+                            StreamLease = #{
+                                topic_filter => ShareTopicFilter,
+                                stream => Stream,
+                                iterator => It
+                            },
+                            {StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]}
+                    end
+                end,
+                {Streams0, []},
+                NewStreams
+            ),
+            Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases}
+        end,
+        Subs0
+    ),
+    State0#{subscriptions => Subs1}.
+
+delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) ->
+    #share{topic = TopicFilter} = ShareTopicFilter,
+    ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId),
+    Subs1 = maps:remove(ShareTopicFilter, Subs0),
+    State0#{subscriptions => Subs1}.
+
+stream_leases(#{subscriptions := Subs0} = State0) ->
+    {Subs1, StreamLeases} = lists:foldl(
+        fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) ->
+            {SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]}
+        end,
+        {Subs0, []},
+        maps:to_list(Subs0)
+    ),
+    State1 = State0#{subscriptions => Subs1},
+    {State1, lists:concat(StreamLeases)}.
+
+now_ms() ->
+    erlang:system_time(millisecond).
+
+add_subscription(
+    #{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime
+) ->
+    #share{topic = TopicFilter} = ShareTopicFilter,
+    ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
+    Subs1 = Subs0#{
+        ShareTopicFilter => #{
+            start_time => StartTime,
+            streams => #{},
+            stream_leases => []
+        }
+    },
+    State1 = State0#{subscriptions => Subs1},
+    State1.

+ 0 - 5
apps/emqx_ds_shared_sub/src/emqx_ds_shared_subs.erl

@@ -1,5 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%--------------------------------------------------------------------
-
--module(emqx_ds_shared_subs).