|
|
@@ -14,6 +14,8 @@
|
|
|
%% limitations under the License.
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+%% TODO: remove the v1 table in release-590
|
|
|
+
|
|
|
-module(emqx_exclusive_subscription).
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
@@ -49,6 +51,11 @@
|
|
|
]).
|
|
|
|
|
|
-record(exclusive_subscription, {
|
|
|
+ topic :: emqx_types:topic(),
|
|
|
+ clientid :: emqx_types:clientid()
|
|
|
+}).
|
|
|
+
|
|
|
+-record(exclusive_subscription_v2, {
|
|
|
topic :: emqx_types:topic(),
|
|
|
clientid :: emqx_types:clientid(),
|
|
|
node :: node(),
|
|
|
@@ -56,6 +63,7 @@
|
|
|
}).
|
|
|
|
|
|
-define(TAB, emqx_exclusive_subscription).
|
|
|
+-define(TAB_V2, emqx_exclusive_subscription_v2).
|
|
|
-define(EXCLUSIVE_SHARD, emqx_exclusive_shard).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -70,38 +78,25 @@ init_tables() ->
|
|
|
]}
|
|
|
],
|
|
|
|
|
|
- Fields = record_info(fields, exclusive_subscription),
|
|
|
-
|
|
|
ok = mria:create_table(?TAB, [
|
|
|
{rlog_shard, ?EXCLUSIVE_SHARD},
|
|
|
{type, set},
|
|
|
{storage, ram_copies},
|
|
|
{record_name, exclusive_subscription},
|
|
|
- {attributes, Fields},
|
|
|
+ {attributes, record_info(fields, exclusive_subscription)},
|
|
|
{storage_properties, StoreProps}
|
|
|
]),
|
|
|
- ok = mria:wait_for_tables([?TAB]),
|
|
|
- try_upgrade_table(Fields).
|
|
|
-
|
|
|
-try_upgrade_table(Fields) ->
|
|
|
- case mnesia:table_info(?TAB, attributes) =:= Fields of
|
|
|
- true ->
|
|
|
- ok;
|
|
|
- false ->
|
|
|
- TransFun = fun
|
|
|
- ({exclusive_subscription, Topic, ClientId}) ->
|
|
|
- #exclusive_subscription{
|
|
|
- topic = Topic,
|
|
|
- clientid = ClientId,
|
|
|
- node = undefined,
|
|
|
- extra = #{}
|
|
|
- };
|
|
|
- (Data = #exclusive_subscription{}) ->
|
|
|
- Data
|
|
|
- end,
|
|
|
- {atomic, ok} = mnesia:transform_table(?TAB, TransFun, Fields),
|
|
|
- ok
|
|
|
- end.
|
|
|
+
|
|
|
+ ok = mria:create_table(?TAB_V2, [
|
|
|
+ {rlog_shard, ?EXCLUSIVE_SHARD},
|
|
|
+ {type, set},
|
|
|
+ {storage, ram_copies},
|
|
|
+ {record_name, exclusive_subscription_v2},
|
|
|
+ {attributes, record_info(fields, exclusive_subscription_v2)},
|
|
|
+ {storage_properties, StoreProps}
|
|
|
+ ]),
|
|
|
+
|
|
|
+ ok = mria:wait_for_tables([?TAB, ?TAB_V2]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% APIs
|
|
|
@@ -127,6 +122,7 @@ check_subscribe(#{clientid := ClientId}, Topic) ->
|
|
|
|
|
|
unsubscribe(Topic, #{is_exclusive := true}) ->
|
|
|
_ = mria:transaction(?EXCLUSIVE_SHARD, fun mnesia:delete/1, [{?TAB, Topic}]),
|
|
|
+ _ = mria:transaction(?EXCLUSIVE_SHARD, fun mnesia:delete/1, [{?TAB_V2, Topic}]),
|
|
|
ok;
|
|
|
unsubscribe(_Topic, _SubOpts) ->
|
|
|
ok.
|
|
|
@@ -140,7 +136,9 @@ dirty_lookup_clientid(Topic) ->
|
|
|
end.
|
|
|
|
|
|
clear() ->
|
|
|
- mria:clear_table(?TAB).
|
|
|
+ _ = mria:clear_table(?TAB),
|
|
|
+ _ = mria:clear_table(?TAB_V2),
|
|
|
+ ok.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
@@ -193,6 +191,14 @@ try_subscribe(ClientId, Topic, Node) ->
|
|
|
mnesia:write(
|
|
|
?TAB,
|
|
|
#exclusive_subscription{
|
|
|
+ clientid = ClientId,
|
|
|
+ topic = Topic
|
|
|
+ },
|
|
|
+ write
|
|
|
+ ),
|
|
|
+ mnesia:write(
|
|
|
+ ?TAB_V2,
|
|
|
+ #exclusive_subscription_v2{
|
|
|
clientid = ClientId,
|
|
|
topic = Topic,
|
|
|
node = Node
|
|
|
@@ -220,14 +226,15 @@ cleanup_subscriptions(Node) ->
|
|
|
).
|
|
|
|
|
|
do_cleanup_subscriptions(Node0) ->
|
|
|
- Spec = ets:fun2ms(fun(#exclusive_subscription{node = Node} = Data) when
|
|
|
+ Spec = ets:fun2ms(fun(#exclusive_subscription_v2{node = Node} = Data) when
|
|
|
Node0 =:= Node
|
|
|
->
|
|
|
Data
|
|
|
end),
|
|
|
lists:foreach(
|
|
|
- fun(Obj) ->
|
|
|
- mnesia:delete_object(?TAB, Obj, write)
|
|
|
+ fun(#exclusive_subscription_v2{topic = Topic} = Obj) ->
|
|
|
+ mnesia:delete({?TAB, Topic}),
|
|
|
+ mnesia:delete_object(?TAB_V2, Obj, write)
|
|
|
end,
|
|
|
- mnesia:select(?TAB, Spec, write)
|
|
|
+ mnesia:select(?TAB_V2, Spec, write)
|
|
|
).
|