emqx_ft_async_reply.erl 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_async_reply).
  17. -include_lib("emqx/include/logger.hrl").
  18. -include_lib("emqx/include/types.hrl").
  19. -include_lib("stdlib/include/ms_transform.hrl").
  20. -export([
  21. create_tables/0,
  22. info/0
  23. ]).
  24. -export([
  25. register/3,
  26. register/4,
  27. take_by_mref/1,
  28. with_new_packet/3,
  29. deregister_all/1
  30. ]).
  31. -type channel_pid() :: pid().
  32. -type mon_ref() :: reference().
  33. -type timer_ref() :: reference().
  34. -type packet_id() :: emqx_types:packet_id().
  35. %% packets waiting for async workers
  36. -define(MON_TAB, emqx_ft_async_mons).
  37. -define(MON_KEY(MRef), ?MON_KEY(self(), MRef)).
  38. -define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}).
  39. -define(MON_RECORD(KEY, PACKET_ID, TREF, DATA), {KEY, PACKET_ID, TREF, DATA}).
  40. %% async worker monitors by packet ids
  41. -define(PACKET_TAB, emqx_ft_async_packets).
  42. -define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)).
  43. -define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}).
  44. -define(PACKET_RECORD(KEY, MREF, DATA), {KEY, MREF, DATA}).
  45. %%--------------------------------------------------------------------
  46. %% API
  47. %% -------------------------------------------------------------------
  48. -spec create_tables() -> ok.
  49. create_tables() ->
  50. EtsOptions = [
  51. named_table,
  52. public,
  53. ordered_set,
  54. {read_concurrency, true},
  55. {write_concurrency, true}
  56. ],
  57. ok = emqx_utils_ets:new(?MON_TAB, EtsOptions),
  58. ok = emqx_utils_ets:new(?PACKET_TAB, EtsOptions),
  59. ok.
  60. -spec register(packet_id(), mon_ref(), timer_ref(), term()) -> ok.
  61. register(PacketId, MRef, TRef, Data) ->
  62. _ = ets:insert(?PACKET_TAB, ?PACKET_RECORD(?PACKET_KEY(PacketId), MRef, Data)),
  63. _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), PacketId, TRef, Data)),
  64. ok.
  65. -spec register(mon_ref(), timer_ref(), term()) -> ok.
  66. register(MRef, TRef, Data) ->
  67. _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), undefined, TRef, Data)),
  68. ok.
  69. -spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any().
  70. with_new_packet(PacketId, Fun, Default) ->
  71. case ets:member(?PACKET_TAB, ?PACKET_KEY(PacketId)) of
  72. true -> Default;
  73. false -> Fun()
  74. end.
  75. -spec take_by_mref(mon_ref()) -> {ok, packet_id() | undefined, timer_ref(), term()} | not_found.
  76. take_by_mref(MRef) ->
  77. case ets:take(?MON_TAB, ?MON_KEY(MRef)) of
  78. [?MON_RECORD(_, PacketId, TRef, Data)] ->
  79. PacketId =/= undefined andalso ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)),
  80. {ok, PacketId, TRef, Data};
  81. [] ->
  82. not_found
  83. end.
  84. -spec deregister_all(channel_pid()) -> ok.
  85. deregister_all(ChannelPid) ->
  86. ok = deregister_packets(ChannelPid),
  87. ok = deregister_mons(ChannelPid),
  88. ok.
  89. -spec info() -> {non_neg_integer(), non_neg_integer()}.
  90. info() ->
  91. {ets:info(?MON_TAB, size), ets:info(?PACKET_TAB, size)}.
  92. %%--------------------------------------------------------------------
  93. %% Internal
  94. %%-------------------------------------------------------------------
  95. deregister_packets(ChannelPid) when is_pid(ChannelPid) ->
  96. MS = [{?PACKET_RECORD(?PACKET_KEY(ChannelPid, '_'), '_', '_'), [], [true]}],
  97. _ = ets:select_delete(?PACKET_TAB, MS),
  98. ok.
  99. deregister_mons(ChannelPid) ->
  100. MS = [{?MON_RECORD(?MON_KEY(ChannelPid, '_'), '_', '_', '_'), [], [true]}],
  101. _ = ets:select_delete(?MON_TAB, MS),
  102. ok.