| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_ft_storage).
- -include_lib("emqx/include/types.hrl").
- -export(
- [
- store_filemeta/2,
- store_segment/2,
- assemble/3,
- kickoff/1,
- files/0,
- files/1,
- with_storage_type/2,
- with_storage_type/3,
- backend/0,
- update_config/2
- ]
- ).
- -type type() :: local.
- -type backend() :: {type(), storage()}.
- -type storage() :: config().
- -type config() :: emqx_config:config().
- -export_type([backend/0]).
- -export_type([assemble_callback/0]).
- -export_type([query/1]).
- -export_type([page/2]).
- -export_type([file_info/0]).
- -export_type([export_data/0]).
- -export_type([reader/0]).
- -type assemble_callback() :: fun((ok | {error, term()}) -> any()).
- -type query(Cursor) ::
- #{transfer => emqx_ft:transfer()}
- | #{
- limit => non_neg_integer(),
- following => Cursor
- }.
- -type page(Item, Cursor) :: #{
- items := [Item],
- cursor => Cursor
- }.
- -type file_info() :: #{
- transfer := emqx_ft:transfer(),
- name := file:name(),
- size := _Bytes :: non_neg_integer(),
- timestamp := emqx_utils_calendar:epoch_second(),
- uri => uri_string:uri_string(),
- meta => emqx_ft:filemeta()
- }.
- -type export_data() :: binary() | qlc:query_handle().
- -type reader() :: pid().
- %%--------------------------------------------------------------------
- %% Behaviour
- %%--------------------------------------------------------------------
- %% NOTE
- %% An async task will wait for a `kickoff` message to start processing, to give some time
- %% to set up monitors, etc. Async task will not explicitly report the processing result,
- %% you are expected to receive and handle exit reason of the process, which is
- %% -type result() :: `{shutdown, ok | {error, _}}`.
- -callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
- ok | {async, pid()} | {error, term()}.
- -callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
- ok | {async, pid()} | {error, term()}.
- -callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) ->
- ok | {async, pid()} | {error, term()}.
- -callback files(storage(), query(Cursor)) ->
- {ok, page(file_info(), Cursor)} | {error, term()}.
- -callback start(storage()) -> any().
- -callback stop(storage()) -> any().
- -callback update_config(_OldConfig :: option(storage()), _NewConfig :: option(storage())) ->
- any().
- %%--------------------------------------------------------------------
- %% API
- %%--------------------------------------------------------------------
- -spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
- ok | {async, pid()} | {error, term()}.
- store_filemeta(Transfer, FileMeta) ->
- dispatch(store_filemeta, [Transfer, FileMeta]).
- -spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
- ok | {async, pid()} | {error, term()}.
- store_segment(Transfer, Segment) ->
- dispatch(store_segment, [Transfer, Segment]).
- -spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
- ok | {async, pid()} | {error, term()}.
- assemble(Transfer, Size, FinOpts) ->
- dispatch(assemble, [Transfer, Size, FinOpts]).
- -spec kickoff(pid()) -> ok.
- kickoff(Pid) ->
- _ = erlang:send(Pid, kickoff),
- ok.
- %%
- -spec files() ->
- {ok, page(file_info(), _)} | {error, term()}.
- files() ->
- files(#{}).
- -spec files(query(Cursor)) ->
- {ok, page(file_info(), Cursor)} | {error, term()}.
- files(Query) ->
- dispatch(files, [Query]).
- -spec dispatch(atom(), list(term())) -> any().
- dispatch(Fun, Args) when is_atom(Fun) ->
- {Type, Storage} = backend(),
- apply(mod(Type), Fun, [Storage | Args]).
- %%
- -spec with_storage_type(atom(), atom() | function()) -> any().
- with_storage_type(Type, Fun) ->
- with_storage_type(Type, Fun, []).
- -spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
- with_storage_type(Type, Fun, Args) ->
- case backend() of
- {Type, Storage} when is_atom(Fun) ->
- apply(mod(Type), Fun, [Storage | Args]);
- {Type, Storage} when is_function(Fun) ->
- apply(Fun, [Storage | Args]);
- {_, _} = Backend ->
- {error, {invalid_storage_backend, Backend}}
- end.
- %%
- -spec backend() -> backend().
- backend() ->
- backend(emqx_ft_conf:storage()).
- -spec update_config(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
- ok.
- update_config(ConfigOld, ConfigNew) ->
- on_backend_update(
- emqx_maybe:apply(fun backend/1, ConfigOld),
- emqx_maybe:apply(fun backend/1, ConfigNew)
- ).
- on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
- ok;
- on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
- ok = (mod(Type)):update_config(StorageOld, StorageNew);
- on_backend_update(BackendOld, BackendNew) when
- (BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
- (BackendNew =:= undefined orelse is_tuple(BackendNew))
- ->
- _ = emqx_maybe:apply(fun stop_backend/1, BackendOld),
- _ = emqx_maybe:apply(fun start_backend/1, BackendNew),
- ok.
- %%--------------------------------------------------------------------
- %% Local API
- %%--------------------------------------------------------------------
- -spec backend(config()) -> backend().
- backend(Config) ->
- emqx_ft_schema:backend(Config).
- start_backend({Type, Storage}) ->
- (mod(Type)):start(Storage).
- stop_backend({Type, Storage}) ->
- (mod(Type)):stop(Storage).
- mod(local) ->
- emqx_ft_storage_fs.
|