emqx_exclusive_subscription.erl 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_exclusive_subscription).
  17. -include_lib("emqx/include/emqx.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -logger_header("[exclusive]").
  20. %% Mnesia bootstrap
  21. -export([mnesia/1]).
  22. %% For upgrade
  23. -export([on_add_module/0, on_delete_module/0]).
  24. -boot_mnesia({mnesia, [boot]}).
  25. -copy_mnesia({mnesia, [copy]}).
  26. -export([
  27. check_subscribe/2,
  28. unsubscribe/2,
  29. clear/0
  30. ]).
  31. %% Internal exports (RPC)
  32. -export([
  33. try_subscribe/2
  34. ]).
  35. -record(exclusive_subscription, {
  36. topic :: emqx_types:topic(),
  37. clientid :: emqx_types:clientid()
  38. }).
  39. -define(TAB, emqx_exclusive_subscription).
  40. -define(EXCLUSIVE_SHARD, emqx_exclusive_shard).
  41. %%--------------------------------------------------------------------
  42. %% Mnesia bootstrap
  43. %%--------------------------------------------------------------------
  44. mnesia(boot) ->
  45. StoreProps = [
  46. {ets, [
  47. {read_concurrency, true},
  48. {write_concurrency, true}
  49. ]}
  50. ],
  51. ok = mria:create_table(?TAB, [
  52. {rlog_shard, ?EXCLUSIVE_SHARD},
  53. {type, set},
  54. {storage, ram_copies},
  55. {record_name, exclusive_subscription},
  56. {attributes, record_info(fields, exclusive_subscription)},
  57. {storage_properties, StoreProps}
  58. ]),
  59. ok = mria_rlog:wait_for_shards([?EXCLUSIVE_SHARD], infinity).
  60. %%--------------------------------------------------------------------
  61. %% Upgrade
  62. %%--------------------------------------------------------------------
  63. on_add_module() ->
  64. mnesia(boot).
  65. on_delete_module() ->
  66. clear().
  67. %%--------------------------------------------------------------------
  68. %% APIs
  69. %%--------------------------------------------------------------------
  70. -spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
  71. allow | deny.
  72. check_subscribe(#{clientid := ClientId}, Topic) ->
  73. case mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/2, [ClientId, Topic]) of
  74. {atomic, Res} ->
  75. Res;
  76. {aborted, Reason} ->
  77. ?SLOG(warning, #{
  78. msg => "Cannot check subscribe ~p due to ~p.", topic => Topic, reason => Reason
  79. }),
  80. deny
  81. end.
  82. unsubscribe(Topic, #{is_exclusive := true}) ->
  83. _ = mria:transaction(?EXCLUSIVE_SHARD, fun mnesia:delete/1, [{?TAB, Topic}]),
  84. ok;
  85. unsubscribe(_Topic, _SubOpts) ->
  86. ok.
  87. clear() ->
  88. mria:clear_table(?TAB).
  89. %%--------------------------------------------------------------------
  90. %% Internal functions
  91. %%--------------------------------------------------------------------
  92. try_subscribe(ClientId, Topic) ->
  93. case mnesia:wread({?TAB, Topic}) of
  94. [] ->
  95. mnesia:write(
  96. ?TAB,
  97. #exclusive_subscription{
  98. clientid = ClientId,
  99. topic = Topic
  100. },
  101. write
  102. ),
  103. allow;
  104. [_] ->
  105. deny
  106. end.