emqx_ft_storage.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_ft_storage).
  17. -include_lib("emqx/include/types.hrl").
  18. -export(
  19. [
  20. store_filemeta/2,
  21. store_segment/2,
  22. assemble/3,
  23. kickoff/1,
  24. files/0,
  25. files/1,
  26. with_storage_type/2,
  27. with_storage_type/3,
  28. backend/0,
  29. update_config/2
  30. ]
  31. ).
  32. -type type() :: local.
  33. -type backend() :: {type(), storage()}.
  34. -type storage() :: config().
  35. -type config() :: emqx_config:config().
  36. -export_type([backend/0]).
  37. -export_type([assemble_callback/0]).
  38. -export_type([query/1]).
  39. -export_type([page/2]).
  40. -export_type([file_info/0]).
  41. -export_type([export_data/0]).
  42. -export_type([reader/0]).
  43. -type assemble_callback() :: fun((ok | {error, term()}) -> any()).
  44. -type query(Cursor) ::
  45. #{transfer => emqx_ft:transfer()}
  46. | #{
  47. limit => non_neg_integer(),
  48. following => Cursor
  49. }.
  50. -type page(Item, Cursor) :: #{
  51. items := [Item],
  52. cursor => Cursor
  53. }.
  54. -type file_info() :: #{
  55. transfer := emqx_ft:transfer(),
  56. name := file:name(),
  57. size := _Bytes :: non_neg_integer(),
  58. timestamp := emqx_utils_calendar:epoch_second(),
  59. uri => uri_string:uri_string(),
  60. meta => emqx_ft:filemeta()
  61. }.
  62. -type export_data() :: binary() | qlc:query_handle().
  63. -type reader() :: pid().
  64. %%--------------------------------------------------------------------
  65. %% Behaviour
  66. %%--------------------------------------------------------------------
  67. %% NOTE
  68. %% An async task will wait for a `kickoff` message to start processing, to give some time
  69. %% to set up monitors, etc. Async task will not explicitly report the processing result,
  70. %% you are expected to receive and handle exit reason of the process, which is
  71. %% -type result() :: `{shutdown, ok | {error, _}}`.
  72. -callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
  73. ok | {async, pid()} | {error, term()}.
  74. -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
  75. ok | {async, pid()} | {error, term()}.
  76. -callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) ->
  77. ok | {async, pid()} | {error, term()}.
  78. -callback files(storage(), query(Cursor)) ->
  79. {ok, page(file_info(), Cursor)} | {error, term()}.
  80. -callback start(storage()) -> any().
  81. -callback stop(storage()) -> any().
  82. -callback update_config(_OldConfig :: option(storage()), _NewConfig :: option(storage())) ->
  83. any().
  84. %%--------------------------------------------------------------------
  85. %% API
  86. %%--------------------------------------------------------------------
  87. -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
  88. ok | {async, pid()} | {error, term()}.
  89. store_filemeta(Transfer, FileMeta) ->
  90. dispatch(store_filemeta, [Transfer, FileMeta]).
  91. -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
  92. ok | {async, pid()} | {error, term()}.
  93. store_segment(Transfer, Segment) ->
  94. dispatch(store_segment, [Transfer, Segment]).
  95. -spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
  96. ok | {async, pid()} | {error, term()}.
  97. assemble(Transfer, Size, FinOpts) ->
  98. dispatch(assemble, [Transfer, Size, FinOpts]).
  99. -spec kickoff(pid()) -> ok.
  100. kickoff(Pid) ->
  101. _ = erlang:send(Pid, kickoff),
  102. ok.
  103. %%
  104. -spec files() ->
  105. {ok, page(file_info(), _)} | {error, term()}.
  106. files() ->
  107. files(#{}).
  108. -spec files(query(Cursor)) ->
  109. {ok, page(file_info(), Cursor)} | {error, term()}.
  110. files(Query) ->
  111. dispatch(files, [Query]).
  112. -spec dispatch(atom(), list(term())) -> any().
  113. dispatch(Fun, Args) when is_atom(Fun) ->
  114. {Type, Storage} = backend(),
  115. apply(mod(Type), Fun, [Storage | Args]).
  116. %%
  117. -spec with_storage_type(atom(), atom() | function()) -> any().
  118. with_storage_type(Type, Fun) ->
  119. with_storage_type(Type, Fun, []).
  120. -spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
  121. with_storage_type(Type, Fun, Args) ->
  122. case backend() of
  123. {Type, Storage} when is_atom(Fun) ->
  124. apply(mod(Type), Fun, [Storage | Args]);
  125. {Type, Storage} when is_function(Fun) ->
  126. apply(Fun, [Storage | Args]);
  127. {_, _} = Backend ->
  128. {error, {invalid_storage_backend, Backend}}
  129. end.
  130. %%
  131. -spec backend() -> backend().
  132. backend() ->
  133. backend(emqx_ft_conf:storage()).
  134. -spec update_config(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
  135. ok.
  136. update_config(ConfigOld, ConfigNew) ->
  137. on_backend_update(
  138. emqx_maybe:apply(fun backend/1, ConfigOld),
  139. emqx_maybe:apply(fun backend/1, ConfigNew)
  140. ).
  141. on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
  142. ok;
  143. on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
  144. ok = (mod(Type)):update_config(StorageOld, StorageNew);
  145. on_backend_update(BackendOld, BackendNew) when
  146. (BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
  147. (BackendNew =:= undefined orelse is_tuple(BackendNew))
  148. ->
  149. _ = emqx_maybe:apply(fun stop_backend/1, BackendOld),
  150. _ = emqx_maybe:apply(fun start_backend/1, BackendNew),
  151. ok.
  152. %%--------------------------------------------------------------------
  153. %% Local API
  154. %%--------------------------------------------------------------------
  155. -spec backend(config()) -> backend().
  156. backend(Config) ->
  157. emqx_ft_schema:backend(Config).
  158. start_backend({Type, Storage}) ->
  159. (mod(Type)):start(Storage).
  160. stop_backend({Type, Storage}) ->
  161. (mod(Type)):stop(Storage).
  162. mod(local) ->
  163. emqx_ft_storage_fs.