Просмотр исходного кода

fix(dssstate): correctly iterate subscriptions and their states for shared subs

Thales Macedo Garitezi 1 год назад
Родитель
Сommit
1e2eaded59

+ 26 - 8
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -27,6 +27,8 @@
 %% for use in the management APIs.
 -module(emqx_persistent_session_ds_state).
 
+-feature(maybe_expr, enable).
+
 -include_lib("emqx_durable_storage/include/emqx_ds.hrl").
 
 -ifdef(STORE_STATE_IN_DS).
@@ -737,10 +739,18 @@ get_subscription(TopicFilter, Rec) ->
     [emqx_persistent_session_ds_subs:subscription()].
 -ifdef(STORE_STATE_IN_DS).
 cold_get_subscription(SessionId, Topic) ->
-    Data = read_iterate(SessionId, [
-        ?subscription_domain_bin, key_encode(?subscription_domain, Topic)
-    ]),
-    lists:map(fun(#{val := V}) -> V end, Data).
+    maybe
+        [#{val := #{key_mappings := #{?subscription_domain := KeyMapping}}}] ?=
+            read_iterate(SessionId, [?metadata_domain_bin, ?metadata_domain_bin]),
+        {ok, IntKey} ?= maps:find(Topic, KeyMapping),
+        Data = read_iterate(
+            SessionId,
+            [?subscription_domain_bin, key_encode(?subscription_domain, IntKey)]
+        ),
+        lists:map(fun(#{val := V}) -> V end, Data)
+    else
+        _ -> []
+    end.
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
 cold_get_subscription(SessionId, Topic) ->
@@ -795,10 +805,18 @@ get_subscription_state(SStateId, Rec) ->
     [emqx_persistent_session_ds_subs:subscription_state()].
 -ifdef(STORE_STATE_IN_DS).
 cold_get_subscription_state(SessionId, SStateId) ->
-    Data = read_iterate(SessionId, [
-        ?subscription_state_domain_bin, key_encode(?subscription_state_domain, SStateId)
-    ]),
-    lists:map(fun(#{val := V}) -> V end, Data).
+    maybe
+        [#{val := #{key_mappings := #{?subscription_state_domain := KeyMapping}}}] ?=
+            read_iterate(SessionId, [?metadata_domain_bin, ?metadata_domain_bin]),
+        {ok, IntKey} ?= maps:find(SStateId, KeyMapping),
+        Data = read_iterate(
+            SessionId,
+            [?subscription_state_domain_bin, key_encode(?subscription_state_domain, IntKey)]
+        ),
+        lists:map(fun(#{val := V}) -> V end, Data)
+    else
+        _ -> []
+    end.
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
 cold_get_subscription_state(SessionId, SStateId) ->