emqx_ft_storage_exporter.erl 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% Filesystem storage exporter
  17. %%
  18. %% This is conceptually a part of the Filesystem storage backend that defines
  19. %% how and where complete transfers are assembled into files and stored.
  20. -module(emqx_ft_storage_exporter).
  21. %% Export API
  22. -export([start_export/3]).
  23. -export([write/2]).
  24. -export([complete/2]).
  25. -export([discard/1]).
  26. %% Listing API
  27. -export([list/2]).
  28. %% Lifecycle API
  29. -export([update_config/2]).
  30. %% Internal API
  31. -export([exporter/1]).
  32. -export_type([export/0, exporter_conf/0]).
  33. -type storage() :: emqx_ft_storage_fs:storage() | undefined.
  34. -type transfer() :: emqx_ft:transfer().
  35. -type filemeta() :: emqx_ft:filemeta().
  36. -type checksum() :: emqx_ft:checksum().
  37. -type exporter_conf() :: map().
  38. -type export_st() :: term().
  39. -type hash_state() :: term().
  40. -opaque export() :: #{
  41. mod := module(),
  42. st := export_st(),
  43. hash := hash_state(),
  44. filemeta := filemeta()
  45. }.
  46. %%------------------------------------------------------------------------------
  47. %% Behaviour
  48. %%------------------------------------------------------------------------------
  49. -callback start_export(exporter_conf(), transfer(), filemeta()) ->
  50. {ok, export_st()} | {error, _Reason}.
  51. %% Exprter must discard the export itself in case of error
  52. -callback write(ExportSt :: export_st(), iodata()) ->
  53. {ok, ExportSt :: export_st()} | {error, _Reason}.
  54. -callback complete(_ExportSt :: export_st(), _Checksum :: checksum()) ->
  55. ok | {error, _Reason}.
  56. -callback discard(ExportSt :: export_st()) ->
  57. ok | {error, _Reason}.
  58. -callback list(exporter_conf(), emqx_ft_storage:query(Cursor)) ->
  59. {ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}.
  60. %% Lifecycle callbacks
  61. -callback start(exporter_conf()) ->
  62. ok | {error, _Reason}.
  63. -callback stop(exporter_conf()) ->
  64. ok.
  65. -callback update_config(exporter_conf(), exporter_conf()) ->
  66. ok | {error, _Reason}.
  67. %%------------------------------------------------------------------------------
  68. %% API
  69. %%------------------------------------------------------------------------------
  70. -spec start_export(storage(), transfer(), filemeta()) ->
  71. {ok, export()} | {error, _Reason}.
  72. start_export(Storage, Transfer, Filemeta) ->
  73. {ExporterMod, ExporterConf} = exporter(Storage),
  74. case ExporterMod:start_export(ExporterConf, Transfer, Filemeta) of
  75. {ok, ExportSt} ->
  76. {ok, #{
  77. mod => ExporterMod,
  78. st => ExportSt,
  79. hash => init_checksum(Filemeta),
  80. filemeta => Filemeta
  81. }};
  82. {error, _} = Error ->
  83. Error
  84. end.
  85. -spec write(export(), iodata()) ->
  86. {ok, export()} | {error, _Reason}.
  87. write(#{mod := ExporterMod, st := ExportSt, hash := Hash} = Export, Content) ->
  88. case ExporterMod:write(ExportSt, Content) of
  89. {ok, ExportStNext} ->
  90. {ok, Export#{
  91. st := ExportStNext,
  92. hash := update_checksum(Hash, Content)
  93. }};
  94. {error, _} = Error ->
  95. Error
  96. end.
  97. -spec complete(export(), emqx_ft:finopts()) ->
  98. ok | {error, _Reason}.
  99. complete(#{mod := ExporterMod, st := ExportSt, hash := Hash, filemeta := Filemeta}, Opts) ->
  100. Checksum = emqx_maybe:define(
  101. % NOTE
  102. % Checksum in `Opts` takes precedence over one in `Filemeta` according to the spec.
  103. % We do not care if they differ.
  104. maps:get(checksum, Opts, undefined),
  105. maps:get(checksum, Filemeta, undefined)
  106. ),
  107. case verify_checksum(Hash, Checksum) of
  108. {ok, ExportChecksum} ->
  109. ExporterMod:complete(ExportSt, ExportChecksum);
  110. {error, _} = Error ->
  111. _ = ExporterMod:discard(ExportSt),
  112. Error
  113. end.
  114. -spec discard(export()) ->
  115. ok | {error, _Reason}.
  116. discard(#{mod := ExporterMod, st := ExportSt}) ->
  117. ExporterMod:discard(ExportSt).
  118. -spec list(storage(), emqx_ft_storage:query(Cursor)) ->
  119. {ok, emqx_ft_storage:page(emqx_ft_storage:file_info(), Cursor)} | {error, _Reason}.
  120. list(Storage, Query) ->
  121. {ExporterMod, ExporterOpts} = exporter(Storage),
  122. ExporterMod:list(ExporterOpts, Query).
  123. %% Lifecycle
  124. -spec update_config(storage(), storage()) -> ok | {error, term()}.
  125. update_config(StorageOld, StorageNew) ->
  126. on_exporter_update(
  127. emqx_maybe:apply(fun exporter/1, StorageOld),
  128. emqx_maybe:apply(fun exporter/1, StorageNew)
  129. ).
  130. on_exporter_update(Config, Config) ->
  131. ok;
  132. on_exporter_update({ExporterMod, ConfigOld}, {ExporterMod, ConfigNew}) ->
  133. ExporterMod:update_config(ConfigOld, ConfigNew);
  134. on_exporter_update(ExporterOld, ExporterNew) ->
  135. _ = emqx_maybe:apply(fun stop/1, ExporterOld),
  136. _ = emqx_maybe:apply(fun start/1, ExporterNew),
  137. ok.
  138. start({ExporterMod, ExporterOpts}) ->
  139. ok = ExporterMod:start(ExporterOpts).
  140. stop({ExporterMod, ExporterOpts}) ->
  141. ok = ExporterMod:stop(ExporterOpts).
  142. %%------------------------------------------------------------------------------
  143. %% Internal functions
  144. %%------------------------------------------------------------------------------
  145. exporter(Storage) ->
  146. case emqx_ft_schema:backend(maps:get(exporter, Storage)) of
  147. {local, Options} ->
  148. {emqx_ft_storage_exporter_fs, Options};
  149. {s3, Options} ->
  150. {emqx_ft_storage_exporter_s3, Options}
  151. end.
  152. init_checksum(#{checksum := {Algo, _}}) ->
  153. crypto:hash_init(Algo);
  154. init_checksum(#{}) ->
  155. crypto:hash_init(sha256).
  156. update_checksum(Ctx, IoData) ->
  157. crypto:hash_update(Ctx, IoData).
  158. verify_checksum(Ctx, {Algo, Digest} = Checksum) ->
  159. case crypto:hash_final(Ctx) of
  160. Digest ->
  161. {ok, Checksum};
  162. Mismatch ->
  163. {error, {checksum_mismatch, Algo, binary:encode_hex(Mismatch)}}
  164. end;
  165. verify_checksum(Ctx, undefined) ->
  166. Digest = crypto:hash_final(Ctx),
  167. {ok, {sha256, Digest}}.