Jelajahi Sumber

fix(dssubs): make store's `open/1` contract more explicit

Andrew Mayorov 1 tahun lalu
induk
melakukan
71a1ebc9ad

+ 1 - 1
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -135,7 +135,7 @@ schema("/durable_queues/:id") ->
 
 '/durable_queues/:id'(get, Params) ->
     case queue_get(Params) of
-        Queue when Queue =/= false ->
+        {ok, Queue} ->
             {200, encode_queue(Queue)};
         false ->
             ?RESP_NOT_FOUND

+ 2 - 3
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -93,7 +93,6 @@
 
 become(ShareTopicFilter, Claim) ->
     Data0 = init_data(ShareTopicFilter),
-    Data0 =/= false orelse exit(shared_subscription_not_declared),
     Data1 = attach_claim(Claim, Data0),
     gen_statem:enter_loop(?MODULE, [], ?leader_active, Data1, init_claim_renewal(Data1)).
 
@@ -110,7 +109,7 @@ init(_Args) ->
 init_data(#share{topic = Topic} = ShareTopicFilter) ->
     StoreID = emqx_ds_shared_sub_store:mk_id(ShareTopicFilter),
     case emqx_ds_shared_sub_store:open(StoreID) of
-        Store when Store =/= false ->
+        {ok, Store} ->
             ?tp(debug, dssub_store_open, #{topic => ShareTopicFilter, store => Store}),
             #{
                 group_id => ShareTopicFilter,
@@ -122,7 +121,7 @@ init_data(#share{topic = Topic} = ShareTopicFilter) ->
         false ->
             %% NOTE: No leader store -> no subscription
             ?tp(warning, dssub_store_notfound, #{topic => ShareTopicFilter}),
-            false
+            exit(shared_subscription_not_declared)
     end.
 
 attach_claim(Claim, Data) ->

+ 4 - 4
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_queue.erl

@@ -60,9 +60,7 @@ destroy(Group, Topic) ->
 destroy(ID) ->
     %% TODO: There's an obvious lack of transactionality.
     case lookup(ID) of
-        false ->
-            not_found;
-        Queue ->
+        {ok, Queue} ->
             #{topic := Topic} = properties(Queue),
             case emqx_ds_shared_sub_store:destroy(Queue) of
                 ok ->
@@ -70,7 +68,9 @@ destroy(ID) ->
                     ok;
                 Error ->
                     Error
-            end
+            end;
+        false ->
+            not_found
     end.
 
 ensure_route(Topic, QueueID) ->

+ 2 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_store.erl

@@ -314,7 +314,7 @@ init(ID) ->
     %% NOTE: Empty store is impicitly dirty because rootset needs to be persisted.
     mk_store(ID).
 
--spec open(id()) -> t() | false.
+-spec open(id()) -> {ok, t()} | false | emqx_ds:error(_).
 open(ID) ->
     case open_rootset(ID) of
         Rootset = #{} ->
@@ -364,7 +364,7 @@ slurp_store(Rootset, StreamIts0, Retries, RetryTimeout, Acc = #{id := ID}) ->
         %% concerning, because this suggests there were concurrent writes that slipped
         %% past the leadership claim guards, yet we can still make progress.
         SeqNum when SeqNum >= map_get(seqnum, Rootset) ->
-            reset_dirty(maps:merge(Store, Rootset));
+            {ok, reset_dirty(maps:merge(Store, Rootset))};
         _Mismatch when Retries > 0 ->
             ok = timer:sleep(RetryTimeout),
             slurp_store(Rootset, StreamIts, Retries - 1, RetryTimeout, Store);