Procházet zdrojové kódy

fix(ds): Apply review remarks

ieQu1 před 1 rokem
rodič
revize
ee9f89b0a6

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -737,7 +737,7 @@ handle_info(#new_stream_event{subref = Ref}, Session, _ClientInfo) ->
         #{Ref := TopicFilter} ->
         #{Ref := TopicFilter} ->
             renew_streams(TopicFilter, Session);
             renew_streams(TopicFilter, Session);
         _ ->
         _ ->
-            ?tp(warning, sessds_unexpected_stream_notifiction, #{ref => Ref}),
+            ?tp(warning, sessds_unexpected_stream_notification, #{ref => Ref}),
             Session
             Session
     end;
     end;
 handle_info(Msg, Session, _ClientInfo) ->
 handle_info(Msg, Session, _ClientInfo) ->

+ 6 - 6
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl

@@ -88,8 +88,8 @@
 open(S) ->
 open(S) ->
     %% Receive notifications about new streams for the topic filter:
     %% Receive notifications about new streams for the topic filter:
     fold_private_subscriptions(
     fold_private_subscriptions(
-        fun(TopicFilter, _Sub, Acc) ->
-            Acc#{watch_streams(TopicFilter) => TopicFilter}
+        fun(TopicFilterBin, _Sub, Acc) ->
+            Acc#{watch_streams(TopicFilterBin) => TopicFilterBin}
         end,
         end,
         #{},
         #{},
         S
         S
@@ -100,7 +100,7 @@ fold_private_subscriptions(Fun, Acc, S) ->
     emqx_persistent_session_ds_state:fold_subscriptions(
     emqx_persistent_session_ds_state:fold_subscriptions(
         fun
         fun
             (#share{}, _Sub, Acc0) -> Acc0;
             (#share{}, _Sub, Acc0) -> Acc0;
-            (TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0)
+            (TopicFilterBin, Sub, Acc0) -> Fun(TopicFilterBin, Sub, Acc0)
         end,
         end,
         Acc,
         Acc,
         S
         S
@@ -298,11 +298,11 @@ cold_get_subscription(SessionId, Topic) ->
 now_ms() ->
 now_ms() ->
     erlang:system_time(millisecond).
     erlang:system_time(millisecond).
 
 
-watch_streams(TopicFilter) ->
+watch_streams(TopicFilterBin) ->
     {ok, Ref} = emqx_ds_new_streams:watch(
     {ok, Ref} = emqx_ds_new_streams:watch(
-        ?PERSISTENT_MESSAGE_DB, emqx_topic:words(TopicFilter)
+        ?PERSISTENT_MESSAGE_DB, emqx_topic:words(TopicFilterBin)
     ),
     ),
-    ?tp(debug, sessds_watch_streams, #{topic_filter => TopicFilter, ref => Ref}),
+    ?tp(debug, sessds_watch_streams, #{topic_filter => TopicFilterBin, ref => Ref}),
     Ref.
     Ref.
 
 
 unwatch_streams(TopicFilter, NewStreamSubs) ->
 unwatch_streams(TopicFilter, NewStreamSubs) ->

+ 4 - 6
apps/emqx/test/emqx_persistent_session_ds_SUITE.erl

@@ -243,7 +243,6 @@ stop_and_commit(Client) ->
 %% `end_of_stream' and doesn't violate the ordering of messages that
 %% `end_of_stream' and doesn't violate the ordering of messages that
 %% are split into different generations.
 %% are split into different generations.
 t_storage_generations(Config) ->
 t_storage_generations(Config) ->
-    [Node1Spec | _] = ?config(node_specs, Config),
     [Node1] = ?config(nodes, Config),
     [Node1] = ?config(nodes, Config),
     Port = get_mqtt_port(Node1, tcp),
     Port = get_mqtt_port(Node1, tcp),
     TopicFilter = <<"t/+">>,
     TopicFilter = <<"t/+">>,
@@ -435,8 +434,7 @@ t_subscription_state_change(Config) ->
     WaitGC = fun() ->
     WaitGC = fun() ->
         ?wait_async_action(
         ?wait_async_action(
             emqx_ds_new_streams:set_dirty(?PERSISTENT_MESSAGE_DB),
             emqx_ds_new_streams:set_dirty(?PERSISTENT_MESSAGE_DB),
-            #{?snk_kind := sessds_renew_streams, ?snk_meta := #{clientid := ClientId}},
-            timer:seconds(10)
+            #{?snk_kind := sessds_renew_streams, ?snk_meta := #{clientid := ClientId}}
         )
         )
     end,
     end,
     %% Helper function that gets runtime state of the session:
     %% Helper function that gets runtime state of the session:
@@ -543,11 +541,11 @@ t_new_stream_notifications(Config) ->
             ),
             ),
             %% Verify that stream notifications are handled:
             %% Verify that stream notifications are handled:
             ?wait_async_action(
             ?wait_async_action(
-                emqx_ds_new_streams:notify_new_stream(?PERSISTENT_MESSAGE_DB, <<"bar">>),
+                emqx_ds_new_streams:notify_new_stream(?PERSISTENT_MESSAGE_DB, [<<"bar">>]),
                 #{?snk_kind := sessds_renew_streams, topic_filter := <<"bar">>}
                 #{?snk_kind := sessds_renew_streams, topic_filter := <<"bar">>}
             ),
             ),
             ?wait_async_action(
             ?wait_async_action(
-                emqx_ds_new_streams:notify_new_stream(?PERSISTENT_MESSAGE_DB, <<"foo/1">>),
+                emqx_ds_new_streams:notify_new_stream(?PERSISTENT_MESSAGE_DB, [<<"foo">>, <<"1">>]),
                 #{?snk_kind := sessds_renew_streams, topic_filter := <<"foo/+">>}
                 #{?snk_kind := sessds_renew_streams, topic_filter := <<"foo/+">>}
             ),
             ),
             %% Verify that new stream subscriptions are removed when
             %% Verify that new stream subscriptions are removed when
@@ -567,7 +565,7 @@ t_new_stream_notifications(Config) ->
                 [], ?of_kind(sessds_new_stream_notification_for_undefined_subscription, Trace)
                 [], ?of_kind(sessds_new_stream_notification_for_undefined_subscription, Trace)
             ),
             ),
             ?assertMatch(
             ?assertMatch(
-                [], ?of_kind(sessds_unexpected_stream_notifiction, Trace)
+                [], ?of_kind(sessds_unexpected_stream_notification, Trace)
             )
             )
         end
         end
     ).
     ).

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds_new_streams.erl

@@ -14,9 +14,9 @@
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-%% @doc This module is used for notifying processes about new streams.
-%% It's not a replacement for `emqx_ds:get_streams' function, it's
-%% only meant to optimize its usage.
+%% @doc This module can be used by DS backends for notifying processes
+%% about new streams. It's not a replacement for `emqx_ds:get_streams'
+%% function, it's only meant to optimize its usage.
 %%
 %%
 %% `emqx_ds_new_streams' module tries to avoid waking up subscribers
 %% `emqx_ds_new_streams' module tries to avoid waking up subscribers
 %% too often. It's done like this:
 %% too often. It's done like this:
@@ -277,7 +277,7 @@ handle_watch({Pid, _}, TopicFilter, Data) ->
         {ok, MRef}
         {ok, MRef}
     catch
     catch
         EC:Err:Stack ->
         EC:Err:Stack ->
-            demonitor(MRef),
+            demonitor(MRef, [flush]),
             ?tp(
             ?tp(
                 error,
                 error,
                 ds_new_streams_failed_to_insert,
                 ds_new_streams_failed_to_insert,

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl

@@ -245,7 +245,7 @@ commit_batch(
         ),
         ),
         %% Apply LTS ops to the memory cache and notify about the new
         %% Apply LTS ops to the memory cache and notify about the new
         %% streams. Note: in case of `builtin_raft' backend
         %% streams. Note: in case of `builtin_raft' backend
-        %% notifiction is sent _for every replica_ of the database, to
+        %% notification is sent _for every replica_ of the database, to
         %% account for possible delays in the replication. Event
         %% account for possible delays in the replication. Event
         %% deduplication logic of `emqx_ds_new_streams' module should
         %% deduplication logic of `emqx_ds_new_streams' module should
         %% mitigate the performance impact of repeated events.
         %% mitigate the performance impact of repeated events.

+ 11 - 3
apps/emqx_durable_storage/src/emqx_ds_sup.erl

@@ -18,7 +18,7 @@
 -behaviour(supervisor).
 -behaviour(supervisor).
 
 
 %% API:
 %% API:
--export([start_link/0]).
+-export([start_link/0, start_link_watch_sup/0]).
 -export([register_db/2, unregister_db/1, which_dbs/0]).
 -export([register_db/2, unregister_db/1, which_dbs/0]).
 
 
 %% behaviour callbacks:
 %% behaviour callbacks:
@@ -41,8 +41,17 @@
 start_link() ->
 start_link() ->
     supervisor:start_link({local, ?TOP}, ?MODULE, top).
     supervisor:start_link({local, ?TOP}, ?MODULE, top).
 
 
+-spec start_link_watch_sup() -> {ok, pid()}.
+start_link_watch_sup() ->
+    supervisor:start_link({local, ?WATCH_SUP}, ?MODULE, new_streams_watch_sup).
+
 register_db(DB, Backend) ->
 register_db(DB, Backend) ->
     ets:insert(?TAB, {DB, Backend}),
     ets:insert(?TAB, {DB, Backend}),
+    %% Currently children of this supervisor are never stopped. This
+    %% is done intentionally: since clients don't monitor (or link to)
+    %% the `emqx_ds_new_streams' server, stopping it would leave them
+    %% with disfunctional subscriptions. To avoid this, the new stream
+    %% subscription servers for each DB should run indefinitely.
     _ = supervisor:start_child(?WATCH_SUP, [DB]),
     _ = supervisor:start_child(?WATCH_SUP, [DB]),
     ok.
     ok.
 
 
@@ -63,8 +72,7 @@ init(top) ->
         emqx_ds_builtin_metrics:child_spec(),
         emqx_ds_builtin_metrics:child_spec(),
         #{
         #{
             id => new_streams_watch_sup,
             id => new_streams_watch_sup,
-            start =>
-                {supervisor, start_link, [{local, ?WATCH_SUP}, ?MODULE, new_streams_watch_sup]},
+            start => {?MODULE, start_link_watch_sup, []},
             type => supervisor,
             type => supervisor,
             restart => permanent
             restart => permanent
         }
         }

+ 2 - 2
apps/emqx_durable_storage/test/emqx_ds_new_streams_SUITE.erl

@@ -93,7 +93,7 @@ t_matching(_) ->
             {ok, Ref1} = emqx_ds_new_streams:watch(DB, [<<"foo">>, <<"1">>]),
             {ok, Ref1} = emqx_ds_new_streams:watch(DB, [<<"foo">>, <<"1">>]),
             {ok, Ref2} = emqx_ds_new_streams:watch(DB, [<<"foo">>, '+']),
             {ok, Ref2} = emqx_ds_new_streams:watch(DB, [<<"foo">>, '+']),
             {ok, Ref3} = emqx_ds_new_streams:watch(DB, [<<"foo">>, '#']),
             {ok, Ref3} = emqx_ds_new_streams:watch(DB, [<<"foo">>, '#']),
-            {ok, Ref4} = emqx_ds_new_streams:watch(DB, [<<>>]),
+            {ok, Ref4} = emqx_ds_new_streams:watch(DB, ['']),
             %% Try patterns that aren't matched by any subscription:
             %% Try patterns that aren't matched by any subscription:
             ok = emqx_ds_new_streams:notify_new_stream(DB, [<<"bar">>]),
             ok = emqx_ds_new_streams:notify_new_stream(DB, [<<"bar">>]),
             ok = emqx_ds_new_streams:notify_new_stream(DB, [<<"bar">>, '+']),
             ok = emqx_ds_new_streams:notify_new_stream(DB, [<<"bar">>, '+']),
@@ -109,7 +109,7 @@ t_matching(_) ->
             %% This should include empty topic subscriptions as well:
             %% This should include empty topic subscriptions as well:
             ok = emqx_ds_new_streams:notify_new_stream(DB, ['#']),
             ok = emqx_ds_new_streams:notify_new_stream(DB, ['#']),
             assertEvents([Ref1, Ref2, Ref3, Ref4]),
             assertEvents([Ref1, Ref2, Ref3, Ref4]),
-            ok = emqx_ds_new_streams:notify_new_stream(DB, [<<>>]),
+            ok = emqx_ds_new_streams:notify_new_stream(DB, ['']),
             assertEvents([Ref4]),
             assertEvents([Ref4]),
             %% These patterns should exclude the first subscriber:
             %% These patterns should exclude the first subscriber:
             ok = emqx_ds_new_streams:notify_new_stream(DB, [<<"foo">>, <<"2">>]),
             ok = emqx_ds_new_streams:notify_new_stream(DB, [<<"foo">>, <<"2">>]),