|
|
@@ -326,28 +326,43 @@ dispatch(Context, Topic) ->
|
|
|
|
|
|
-spec delete_message(context(), topic()) -> ok.
|
|
|
delete_message(Context, Topic) ->
|
|
|
- Mod = backend_module(Context),
|
|
|
- BackendState = backend_state(Context),
|
|
|
- Mod:delete_message(BackendState, Topic).
|
|
|
+ with_backend_module(
|
|
|
+ Context,
|
|
|
+ fun(Mod, BackendState) ->
|
|
|
+ Mod:delete_message(BackendState, Topic)
|
|
|
+ end,
|
|
|
+ ok
|
|
|
+ ).
|
|
|
|
|
|
-spec read_message(context(), topic()) -> {ok, list(message())}.
|
|
|
read_message(Context, Topic) ->
|
|
|
- Mod = backend_module(Context),
|
|
|
- BackendState = backend_state(Context),
|
|
|
- Mod:read_message(BackendState, Topic).
|
|
|
+ with_backend_module(
|
|
|
+ Context,
|
|
|
+ fun(Mod, BackendState) -> Mod:read_message(BackendState, Topic) end,
|
|
|
+ {ok, []}
|
|
|
+ ).
|
|
|
|
|
|
-spec page_read(context(), emqx_maybe:t(topic()), deadline(), non_neg_integer(), non_neg_integer()) ->
|
|
|
{ok, has_next(), list(message())}.
|
|
|
page_read(Context, Topic, Deadline, Page, Limit) ->
|
|
|
- Mod = backend_module(Context),
|
|
|
- BackendState = backend_state(Context),
|
|
|
- Mod:page_read(BackendState, Topic, Deadline, Page, Limit).
|
|
|
+ with_backend_module(
|
|
|
+ Context,
|
|
|
+ fun(Mod, BackendState) -> Mod:page_read(BackendState, Topic, Deadline, Page, Limit) end,
|
|
|
+ {ok, false, []}
|
|
|
+ ).
|
|
|
|
|
|
-spec count(context()) -> non_neg_integer().
|
|
|
count(Context) ->
|
|
|
- Mod = backend_module(Context),
|
|
|
- BackendState = backend_state(Context),
|
|
|
- Mod:size(BackendState).
|
|
|
+ with_backend_module(Context, fun(Mod, BackendState) -> Mod:size(BackendState) end, 0).
|
|
|
+
|
|
|
+with_backend_module(Context, Fun, Default) ->
|
|
|
+ case backend_module(Context) of
|
|
|
+ undefined ->
|
|
|
+ Default;
|
|
|
+ Mod ->
|
|
|
+ BackendState = backend_state(Context),
|
|
|
+ Fun(Mod, BackendState)
|
|
|
+ end.
|
|
|
|
|
|
-spec start_clear_expired(context()) -> ok.
|
|
|
start_clear_expired(Context) ->
|
|
|
@@ -371,16 +386,16 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->
|
|
|
limit => Limit
|
|
|
});
|
|
|
_ ->
|
|
|
- Mod = backend_module(Context),
|
|
|
- BackendState = backend_state(Context),
|
|
|
- Mod:store_retained(BackendState, Msg)
|
|
|
+ with_backend_module(
|
|
|
+ Context,
|
|
|
+ fun(Mod, BackendState) -> Mod:store_retained(BackendState, Msg) end,
|
|
|
+ ok
|
|
|
+ )
|
|
|
end.
|
|
|
|
|
|
-spec clean(context()) -> ok.
|
|
|
clean(Context) ->
|
|
|
- Mod = backend_module(Context),
|
|
|
- BackendState = backend_state(Context),
|
|
|
- Mod:clean(BackendState).
|
|
|
+ with_backend_module(Context, fun(Mod, BackendState) -> Mod:clean(BackendState) end, ok).
|
|
|
|
|
|
-spec update_config(state(), hocon:config(), hocon:config()) -> state().
|
|
|
update_config(State, NewConfig, OldConfig) ->
|
|
|
@@ -512,8 +527,7 @@ create(Cfg) ->
|
|
|
|
|
|
-spec close(context()) -> ok | {error, term()}.
|
|
|
close(Context) ->
|
|
|
- Mod = backend_module(Context),
|
|
|
- Mod:close(Context).
|
|
|
+ with_backend_module(Context, fun(Mod, BackendState) -> Mod:close(BackendState) end, ok).
|
|
|
|
|
|
-spec load(context()) -> ok.
|
|
|
load(Context) ->
|