Procházet zdrojové kódy

docs(ps_session): add notes explaining rationale about operation order and consistency

Thales Macedo Garitezi před 2 roky
rodič
revize
f55f6dc779
1 změnil soubory, kde provedl 20 přidání a 0 odebrání
  1. 20 0
      apps/emqx/src/emqx_persistent_session_ds.erl

+ 20 - 0
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -99,6 +99,24 @@ open_session(ClientID) ->
 add_subscription(TopicFilterBin, DSSessionID) ->
 add_subscription(TopicFilterBin, DSSessionID) ->
     ?WHEN_ENABLED(
     ?WHEN_ENABLED(
         begin
         begin
+            %% N.B.: we chose to update the router before adding the subscription to the
+            %% session/iterator table.  The reasoning for this is as follows:
+            %%
+            %% Messages matching this topic filter should start to be persisted as soon as
+            %% possible to avoid missing messages.  If this is the first such persistent
+            %% session subscription, it's important to do so early on.
+            %%
+            %% This could, in turn, lead to some inconsistency: if such a route gets
+            %% created but the session/iterator data fails to be updated accordingly, we
+            %% have a dangling route.  To remove such dangling routes, we may have a
+            %% periodic GC process that removes routes that do not have a matching
+            %% persistent subscription.  Also, route operations use dirty mnesia
+            %% operations, which inherently have room for inconsistencies.
+            %%
+            %% In practice, we use the iterator reference table as a source of truth,
+            %% since it is guarded by a transaction context: we consider a subscription
+            %% operation to be successful if it ended up changing this table.  Both router
+            %% and iterator information can be reconstructed from this table, if needed.
             ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
             ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID),
             TopicFilter = emqx_topic:words(TopicFilterBin),
             TopicFilter = emqx_topic:words(TopicFilterBin),
             {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
             {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator(
@@ -146,6 +164,8 @@ do_open_iterator(TopicFilter, StartMS, IteratorID) ->
 del_subscription(TopicFilterBin, DSSessionID) ->
 del_subscription(TopicFilterBin, DSSessionID) ->
     ?WHEN_ENABLED(
     ?WHEN_ENABLED(
         begin
         begin
+            %% N.B.: see comments in `?MODULE:add_subscription' for a discussion about the
+            %% order of operations here.
             TopicFilter = emqx_topic:words(TopicFilterBin),
             TopicFilter = emqx_topic:words(TopicFilterBin),
             case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
             case emqx_ds:session_get_iterator_id(DSSessionID, TopicFilter) of
                 {error, not_found} ->
                 {error, not_found} ->