Prechádzať zdrojové kódy

chore(retainer): add backend creation to the behaviour

Ilya Averyanov 2 rokov pred
rodič
commit
aa2ee9c409

+ 18 - 29
apps/emqx_retainer/src/emqx_retainer.erl

@@ -63,7 +63,6 @@
 
 -type state() :: #{
     enable := boolean(),
-    context_id := non_neg_integer(),
     context := undefined | context(),
     clear_timer := undefined | reference()
 }.
@@ -72,8 +71,8 @@
 -define(DEF_EXPIRY_INTERVAL, 0).
 -define(MAX_PAYLOAD_SIZE_CONFIG_PATH, [retainer, max_payload_size]).
 
--define(CAST(Msg), gen_server:cast(?MODULE, Msg)).
-
+-callback create(map()) -> context().
+-callback close(context()) -> ok.
 -callback delete_message(context(), topic()) -> ok.
 -callback store_retained(context(), message()) -> ok.
 -callback read_message(context(), topic()) -> {ok, list()}.
@@ -264,15 +263,10 @@ code_change(_OldVsn, State, _Extra) ->
 new_state() ->
     #{
         enable => false,
-        context_id => 0,
         context => undefined,
         clear_timer => undefined
     }.
 
--spec new_context(pos_integer()) -> context().
-new_context(Id) ->
-    #{context_id => Id}.
-
 payload_size_limit() ->
     emqx_conf:get(?MAX_PAYLOAD_SIZE_CONFIG_PATH, ?DEF_MAX_PAYLOAD_SIZE).
 
@@ -361,18 +355,16 @@ update_config(
 
 -spec enable_retainer(state(), hocon:config()) -> state().
 enable_retainer(
-    #{context_id := ContextId} = State,
+    State,
     #{
         msg_clear_interval := ClearInterval,
         backend := BackendCfg
     }
 ) ->
-    NewContextId = ContextId + 1,
-    Context = create_resource(new_context(NewContextId), BackendCfg),
+    Context = create(BackendCfg),
     load(Context),
     State#{
         enable := true,
-        context_id := NewContextId,
         context := Context,
         clear_timer := add_timer(ClearInterval, clear_expired)
     }.
@@ -385,7 +377,7 @@ disable_retainer(
     } = State
 ) ->
     unload(),
-    ok = close_resource(Context),
+    ok = close(Context),
     State#{
         enable := false,
         clear_timer := stop_timer(ClearTimer)
@@ -416,22 +408,19 @@ check_timer(Timer, _, _) ->
 
 -spec get_backend_module() -> backend().
 get_backend_module() ->
-    ModName =
-        case emqx:get_config([retainer, backend]) of
-            #{type := built_in_database} -> mnesia;
-            #{type := Backend} -> Backend
-        end,
-    erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])).
-
-create_resource(Context, #{type := built_in_database} = Cfg) ->
-    emqx_retainer_mnesia:create_resource(Cfg),
-    Context.
-
--spec close_resource(context()) -> ok | {error, term()}.
-close_resource(#{resource_id := ResourceId}) ->
-    emqx_resource:stop(ResourceId);
-close_resource(_) ->
-    ok.
+    case emqx:get_config([retainer, backend]) of
+        #{type := built_in_database} -> emqx_retainer_mnesia;
+        #{type := Backend} -> Backend
+    end.
+
+create(#{type := built_in_database} = Cfg) ->
+    Mod = get_backend_module(),
+    Mod:create(Cfg).
+
+-spec close(context()) -> ok | {error, term()}.
+close(Context) ->
+    Mod = get_backend_module(),
+    Mod:close(Context).
 
 -spec load(context()) -> ok.
 load(Context) ->

+ 14 - 10
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -26,6 +26,8 @@
 
 %% emqx_retainer callbacks
 -export([
+    create/1,
+    close/1,
     delete_message/2,
     store_retained/2,
     read_message/2,
@@ -46,8 +48,6 @@
 %% Management API:
 -export([topics/0]).
 
--export([create_resource/1]).
-
 -export([reindex/2, reindex_status/0]).
 
 -ifdef(TEST).
@@ -78,7 +78,7 @@ topics() ->
 %% emqx_retainer callbacks
 %%--------------------------------------------------------------------
 
-create_resource(#{storage_type := StorageType}) ->
+create(#{storage_type := StorageType}) ->
     ok = create_table(
         ?TAB_INDEX_META,
         retained_index_meta,
@@ -100,7 +100,9 @@ create_resource(#{storage_type := StorageType}) ->
         record_info(fields, retained_index),
         ordered_set,
         StorageType
-    ).
+    ),
+    %% The context is not used by this backend
+    #{}.
 
 create_table(Table, RecordName, Attributes, Type, StorageType) ->
     Copies =
@@ -136,7 +138,9 @@ create_table(Table, RecordName, Attributes, Type, StorageType) ->
             ok
     end.
 
-store_retained(_, Msg = #message{topic = Topic}) ->
+close(_Context) -> ok.
+
+store_retained(_Context, Msg = #message{topic = Topic}) ->
     ExpiryTime = emqx_retainer:get_expiry_time(Msg),
     Tokens = topic_to_tokens(Topic),
     case is_table_full() andalso is_new_topic(Tokens) of
@@ -172,7 +176,7 @@ clear_expired() ->
     QC = qlc:cursor(QH),
     clear_batch(dirty_indices(write), QC).
 
-delete_message(_, Topic) ->
+delete_message(_Context, Topic) ->
     Tokens = topic_to_tokens(Topic),
     case emqx_topic:wildcard(Topic) of
         false ->
@@ -188,10 +192,10 @@ delete_message(_, Topic) ->
             )
     end.
 
-read_message(_, Topic) ->
+read_message(_Context, Topic) ->
     {ok, read_messages(Topic)}.
 
-match_messages(_, Topic, undefined) ->
+match_messages(_Context, Topic, undefined) ->
     Tokens = topic_to_tokens(Topic),
     Now = erlang:system_time(millisecond),
     QH = msg_table(search_table(Tokens, Now)),
@@ -202,7 +206,7 @@ match_messages(_, Topic, undefined) ->
             Cursor = qlc:cursor(QH),
             match_messages(undefined, Topic, {Cursor, BatchNum})
     end;
-match_messages(_, _Topic, {Cursor, BatchNum}) ->
+match_messages(_Context, _Topic, {Cursor, BatchNum}) ->
     case qlc_next_answers(Cursor, BatchNum) of
         {closed, Rows} ->
             {ok, Rows, undefined};
@@ -210,7 +214,7 @@ match_messages(_, _Topic, {Cursor, BatchNum}) ->
             {ok, Rows, {Cursor, BatchNum}}
     end.
 
-page_read(_, Topic, Page, Limit) ->
+page_read(_Context, Topic, Page, Limit) ->
     Now = erlang:system_time(millisecond),
     QH =
         case Topic of