Просмотр исходного кода

feat(dssubs): handle concurrent and live destroys reasonably

Andrew Mayorov 1 год назад
Родитель
Сommit
db5e480d54

+ 19 - 18
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_api.erl

@@ -10,18 +10,19 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 
+-define(DESC_NOT_FOUND, <<"Queue not found">>).
 -define(RESP_NOT_FOUND,
-    {404, #{
-        code => <<"NOT_FOUND">>,
-        message => <<"Queue not found">>
-    }}
+    {404, #{code => <<"NOT_FOUND">>, message => ?DESC_NOT_FOUND}}
 ).
 
+-define(DESC_CREATE_CONFICT, <<"Queue with given group name and topic filter already exists">>).
 -define(RESP_CREATE_CONFLICT,
-    {409, #{
-        code => <<"CONFLICT">>,
-        message => <<"Queue with given group name and topic filter already exists">>
-    }}
+    {409, #{code => <<"CONFLICT">>, message => ?DESC_CREATE_CONFICT}}
+).
+
+-define(DESC_DELETE_CONFLICT, <<"Queue is currently active">>).
+-define(RESP_DELETE_CONFLICT,
+    {409, #{code => <<"CONFLICT">>, message => ?DESC_DELETE_CONFLICT}}
 ).
 
 -define(RESP_INTERNAL_ERROR(MSG),
@@ -63,8 +64,6 @@ paths() ->
         "/durable_queues/:id"
     ].
 
--define(NOT_FOUND, 'NOT_FOUND').
-
 schema("/durable_queues") ->
     #{
         'operationId' => '/durable_queues',
@@ -88,7 +87,8 @@ schema("/durable_queues") ->
                 201 => emqx_dashboard_swagger:schema_with_example(
                     durable_queue_get(),
                     durable_queue_get_example()
-                )
+                ),
+                409 => error_codes(['CONFLICT'], ?DESC_CREATE_CONFICT)
             }
         }
     };
@@ -105,7 +105,7 @@ schema("/durable_queues/:id") ->
                     durable_queue_get(),
                     durable_queue_get_example()
                 ),
-                404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>)
+                404 => error_codes(['NOT_FOUND'], ?DESC_NOT_FOUND)
             }
         },
         delete => #{
@@ -115,7 +115,8 @@ schema("/durable_queues/:id") ->
             parameters => [param_queue_id()],
             responses => #{
                 200 => <<"Queue deleted">>,
-                404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>)
+                404 => error_codes(['NOT_FOUND'], ?DESC_NOT_FOUND),
+                409 => error_codes(['CONFLICT'], ?DESC_DELETE_CONFLICT)
             }
         }
     }.
@@ -144,13 +145,13 @@ schema("/durable_queues/:id") ->
         ok ->
             {200, <<"Queue deleted">>};
         not_found ->
-            ?RESP_NOT_FOUND
+            ?RESP_NOT_FOUND;
+        conflict ->
+            ?RESP_DELETE_CONFLICT;
+        {error, _Class, Reason} ->
+            ?RESP_INTERNAL_ERROR(emqx_utils:readable_error_msg(Reason))
     end.
 
-%%--------------------------------------------------------------------
-%% Actual handlers: stubs
-%%--------------------------------------------------------------------
-
 queue_list() ->
     %% TODO
     [].

+ 13 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -184,6 +184,8 @@ handle_event({timeout, #renew_leader_claim{}}, #renew_leader_claim{}, ?leader_ac
         Data1 = #{} ->
             Actions = init_claim_renewal(Data1),
             {keep_state, Data1, Actions};
+        {stop, Reason} ->
+            {stop, {shutdown, Reason}};
         Error ->
             {stop, Error}
     end;
@@ -267,15 +269,24 @@ renew_leader_claim(Data = #{group_id := ShareTopicFilter, store := Store0, leade
         {ok, RenewedClaim, CommittedStore} ->
             ?tp(shared_sub_leader_store_committed, #{
                 id => ShareTopicFilter,
-                store => emqx_ds_shared_sub_store:mk_id(ShareTopicFilter),
+                store => emqx_ds_shared_sub_store:id(CommittedStore),
                 claim => Claim,
                 renewed => RenewedClaim
             }),
             attach_claim(RenewedClaim, Data#{store := CommittedStore});
+        destroyed ->
+            %% NOTE
+            %% Not doing anything under the assumption that destroys happen long after
+            %% clients are gone and leaders are dead.
+            ?tp(warning, "Shared subscription leader store destroyed", #{
+                id => ShareTopicFilter,
+                store => emqx_ds_shared_sub_store:id(Store0)
+            }),
+            {stop, shared_subscription_destroyed};
         {error, Class, Reason} = Error ->
             ?tp(warning, "Shared subscription leader store commit failed", #{
                 id => ShareTopicFilter,
-                store => emqx_ds_shared_sub_store:mk_id(ShareTopicFilter),
+                store => emqx_ds_shared_sub_store:id(Store0),
                 claim => Claim,
                 reason => Reason
             }),

+ 2 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_queue.erl

@@ -58,15 +58,15 @@ destroy(Group, Topic) ->
     destroy(emqx_ds_shared_sub_store:mk_id(Group, Topic)).
 
 destroy(ID) ->
-    %% FIXME: Sync on leader.
     %% TODO: There's an obvious lack of transactionality.
     case lookup(ID) of
         false ->
             not_found;
         Queue ->
+            #{topic := Topic} = properties(Queue),
             case emqx_ds_shared_sub_store:destroy(Queue) of
                 ok ->
-                    _ = ensure_delete_route(maps:get(topic, properties(Queue)), ID),
+                    _ = ensure_delete_route(Topic, ID),
                     ok;
                 Error ->
                     Error

+ 2 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl

@@ -70,6 +70,8 @@ start_elector(ShareTopic) ->
         shutdown => 5000
     }).
 
+%%------------------------------------------------------------------------------
+
 -spec purge() -> ok.
 purge() ->
     Children = supervisor:which_children(?MODULE),

+ 35 - 1
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -76,7 +76,7 @@ declare_queue(Group, Topic, Config) ->
 destroy_queue(Config) ->
     Group = proplists:get_value(queue_group, Config),
     Topic = proplists:get_value(queue_topic, Config),
-    ok = emqx_ds_shared_sub_queue:destroy(Group, Topic).
+    emqx_ds_shared_sub_queue:destroy(Group, Topic).
 
 t_lease_initial('init', Config) ->
     declare_queue(<<"gr1">>, <<"topic1/#">>, Config);
@@ -127,6 +127,40 @@ t_declare_triggers_persistence(_Config) ->
     ok = emqtt:disconnect(ConnShared),
     ok = emqtt:disconnect(ConnPub).
 
+t_destroy_queue_live_clients('init', Config) ->
+    declare_queue(<<"dqlc">>, <<"t1337/#">>, Config);
+t_destroy_queue_live_clients('end', Config) ->
+    destroy_queue(Config).
+
+t_destroy_queue_live_clients(Config) ->
+    ConnPub = emqtt_connect_pub(<<"client_pub">>),
+
+    ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
+    ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
+    {ok, _, [1]} = emqtt:subscribe(ConnShared1, <<"$share/dqlc/t1337/#">>, 1),
+    {ok, _, [1]} = emqtt:subscribe(ConnShared2, <<"$share/dqlc/t1337/#">>, 1),
+
+    {ok, _} = emqtt:publish(ConnPub, <<"t1337/1">>, <<"hello1">>, 1),
+    {ok, _} = emqtt:publish(ConnPub, <<"t1337/2">>, <<"hello2">>, 1),
+    {ok, _} = emqtt:publish(ConnPub, <<"t1337/3/4">>, <<"hello3">>, 1),
+    {ok, _} = emqtt:publish(ConnPub, <<"t1337/5/6">>, <<"hello4">>, 1),
+
+    ?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000),
+    ?assertReceive({publish, #{payload := <<"hello2">>}}, 1_000),
+    ?assertReceive({publish, #{payload := <<"hello3">>}}, 1_000),
+    ?assertReceive({publish, #{payload := <<"hello4">>}}, 1_000),
+
+    ok = destroy_queue(Config),
+
+    %% No more published after the queue was destroyed.
+    {ok, _} = emqtt:publish(ConnPub, <<"t1337/1">>, <<"hello5">>, 1),
+    {ok, _} = emqtt:publish(ConnPub, <<"t1337/2">>, <<"hello6">>, 1),
+    ?assertNotReceive({publish, #{payload := _}}, 5_000),
+
+    ok = emqtt:disconnect(ConnShared1),
+    ok = emqtt:disconnect(ConnShared2),
+    ok = emqtt:disconnect(ConnPub).
+
 t_two_clients('init', Config) ->
     declare_queue(<<"gr4">>, <<"topic4/#">>, Config);
 t_two_clients('end', Config) ->