emqx_persistent_session_ds_subs.erl 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc This module encapsulates the data related to the client's
  17. %% subscriptions. It tries to reppresent the subscriptions as if they
  18. %% were a simple key-value map.
  19. %%
  20. %% In reality, however, the session has to retain old the
  21. %% subscriptions for longer to ensure the consistency of message
  22. %% replay.
  23. -module(emqx_persistent_session_ds_subs).
  24. %% API:
  25. -export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]).
  26. -export_type([]).
  27. %%================================================================================
  28. %% Type declarations
  29. %%================================================================================
  30. %%================================================================================
  31. %% API functions
  32. %%================================================================================
  33. %% @doc Process a new subscription
  34. -spec on_subscribe(
  35. emqx_persistent_session_ds:topic_filter(),
  36. emqx_persistent_session_ds:subscription(),
  37. emqx_persistent_session_ds_state:t()
  38. ) ->
  39. emqx_persistent_session_ds_state:t().
  40. on_subscribe(TopicFilter, Subscription, S) ->
  41. emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S).
  42. %% @doc Process UNSUBSCRIBE
  43. -spec on_unsubscribe(
  44. emqx_persistent_session_ds:topic_filter(),
  45. emqx_persistent_session_ds:subscription(),
  46. emqx_persistent_session_ds_state:t()
  47. ) ->
  48. emqx_persistent_session_ds_state:t().
  49. on_unsubscribe(TopicFilter, Subscription0, S0) ->
  50. %% Note: we cannot delete the subscription immediately, since its
  51. %% metadata can be used during replay (see `process_batch'). We
  52. %% instead mark it as deleted, and let `subscription_gc' function
  53. %% dispatch it later:
  54. Subscription = Subscription0#{deleted => true},
  55. emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0).
  56. %% @doc Remove subscriptions that have been marked for deletion, and
  57. %% that don't have any unacked messages:
  58. -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
  59. gc(S0) ->
  60. fold_all(
  61. fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) ->
  62. case Deleted andalso has_no_unacked_streams(SubId, S0) of
  63. true ->
  64. emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc);
  65. false ->
  66. Acc
  67. end
  68. end,
  69. S0,
  70. S0
  71. ).
  72. %% @doc Fold over active subscriptions:
  73. -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
  74. emqx_persistent_session_ds:subscription() | undefined.
  75. lookup(TopicFilter, S) ->
  76. Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
  77. case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of
  78. #{deleted := true} ->
  79. undefined;
  80. Sub ->
  81. Sub
  82. end.
  83. %% @doc Convert active subscriptions to a map, for information
  84. %% purpose:
  85. -spec to_map(emqx_persistent_session_ds_state:t()) -> map().
  86. to_map(S) ->
  87. fold(
  88. fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end,
  89. #{},
  90. S
  91. ).
  92. %% @doc Fold over active subscriptions:
  93. -spec fold(
  94. fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
  95. Acc,
  96. emqx_persistent_session_ds_state:t()
  97. ) ->
  98. Acc.
  99. fold(Fun, AccIn, S) ->
  100. fold_all(
  101. fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) ->
  102. case Deleted of
  103. true -> Acc;
  104. false -> Fun(TopicFilter, Sub, Acc)
  105. end
  106. end,
  107. AccIn,
  108. S
  109. ).
  110. %% @doc Fold over all subscriptions, including inactive ones:
  111. -spec fold_all(
  112. fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
  113. Acc,
  114. emqx_persistent_session_ds_state:t()
  115. ) ->
  116. Acc.
  117. fold_all(Fun, AccIn, S) ->
  118. Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
  119. emqx_topic_gbt:fold(
  120. fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,
  121. AccIn,
  122. Subs
  123. ).
  124. %%================================================================================
  125. %% Internal functions
  126. %%================================================================================
  127. -spec has_no_unacked_streams(
  128. emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
  129. ) -> boolean().
  130. has_no_unacked_streams(SubId, S) ->
  131. emqx_persistent_session_ds_state:fold_streams(
  132. fun
  133. ({SID, _Stream}, Srs, Acc) when SID =:= SubId ->
  134. emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc;
  135. (_StreamKey, _Srs, Acc) ->
  136. Acc
  137. end,
  138. true,
  139. S
  140. ).