Przeglądaj źródła

fix(ds): Unified the names of the `add_generation` API

firest 2 lat temu
rodzic
commit
ed38ca67d5

+ 8 - 18
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -22,7 +22,7 @@
 -module(emqx_ds).
 
 %% Management API:
--export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]).
+-export([open_db/2, add_generation/2, add_generation/1, drop_db/1]).
 
 %% Message storage API:
 -export([store_batch/2, store_batch/3]).
@@ -124,10 +124,10 @@
 
 -callback open_db(db(), create_db_opts()) -> ok | {error, _}.
 
--callback update_db_config(db(), create_db_opts()) -> ok | {error, _}.
-
 -callback add_generation(db()) -> ok | {error, _}.
 
+-callback add_generation(db(), create_db_opts()) -> ok | {error, _}.
+
 -callback drop_db(db()) -> ok | {error, _}.
 
 -callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
@@ -158,23 +158,13 @@ open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin orelse Backen
     persistent_term:put(?persistent_term(DB), Module),
     ?module(DB):open_db(DB, Opts).
 
--spec update_db_config(db(), create_db_opts()) -> ok.
-update_db_config(DB, Opts) ->
-    case persistent_term:get(?persistent_term(DB), undefined) of
-        undefined ->
-            ok;
-        Module ->
-            Module:update_db_config(DB, Opts)
-    end.
-
 -spec add_generation(db()) -> ok.
 add_generation(DB) ->
-    case persistent_term:get(?persistent_term(DB), undefined) of
-        undefined ->
-            ok;
-        Module ->
-            Module:add_generation(DB)
-    end.
+    ?module(DB):add_generation(DB).
+
+-spec add_generation(db(), create_db_opts()) -> ok.
+add_generation(DB, Opts) ->
+    ?module(DB):add_generation(DB, Opts).
 
 %% @doc TODO: currently if one or a few shards are down, they won't be
 

+ 9 - 9
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -23,8 +23,8 @@
 -export([
     list_shards/1,
     open_db/2,
-    update_db_config/2,
     add_generation/1,
+    add_generation/2,
     drop_db/1,
     store_batch/3,
     get_streams/3,
@@ -41,7 +41,7 @@
     do_make_iterator_v1/5,
     do_update_iterator_v2/4,
     do_next_v1/4,
-    do_add_generation_v1/1
+    do_add_generation_v2/1
 ]).
 
 -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
@@ -124,16 +124,16 @@ open_db(DB, CreateOpts) ->
         MyShards
     ).
 
--spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
-update_db_config(DB, CreateOpts) ->
-    emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
-
 -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
 add_generation(DB) ->
     Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
-    _ = emqx_ds_proto_v1:add_generation(Nodes, DB),
+    _ = emqx_ds_proto_v2:add_generation(Nodes, DB),
     ok.
 
+-spec add_generation(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
+add_generation(DB, CreateOpts) ->
+    emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
+
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
     Nodes = list_nodes(),
@@ -297,8 +297,8 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
 do_next_v1(DB, Shard, Iter, BatchSize) ->
     emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
 
--spec do_add_generation_v1(emqx_ds:db()) -> ok | {error, _}.
-do_add_generation_v1(DB) ->
+-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
+do_add_generation_v2(DB) ->
     MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
 
     lists:foreach(

+ 3 - 3
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -470,15 +470,15 @@ filter_shards(DB, Predicte) ->
         ShardId
     end).
 
-filter_shards(DB, Predicte, Maper) ->
+filter_shards(DB, Predicate, Mapper) ->
     eval_qlc(
         qlc:q([
-            Maper(Shard)
+            Mapper(Shard)
          || #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table(
                 ?SHARD_TAB
             ),
             D =:= DB,
-            Predicte(Shard)
+            Predicate(Shard)
         ])
     ).
 

+ 0 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -265,8 +265,6 @@ add_generation(ShardId) ->
 %% gen_server for the shard
 %%================================================================================
 
--define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
-
 -spec start_link(shard_id(), emqx_ds:builtin_db_options()) ->
     {ok, pid()}.
 start_link(Shard = {_, _}, Options) ->

+ 1 - 7
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -24,8 +24,7 @@
     store_batch/5,
     get_streams/5,
     make_iterator/6,
-    next/5,
-    add_generation/2
+    next/5
 ]).
 
 %% behavior callbacks:
@@ -90,11 +89,6 @@ store_batch(Node, DB, Shard, Batch, Options) ->
         DB, Shard, Batch, Options
     ]).
 
--spec add_generation([node()], emqx_ds:db()) ->
-    [{ok, ok} | erpc:caught_call_exception()].
-add_generation(Node, DB) ->
-    erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v1, [DB]).
-
 %%================================================================================
 %% behavior callbacks
 %%================================================================================

+ 7 - 1
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl

@@ -27,7 +27,8 @@
     next/5,
 
     %% introduced in v2
-    update_iterator/5
+    update_iterator/5,
+    add_generation/2
 ]).
 
 %% behavior callbacks:
@@ -110,6 +111,11 @@ update_iterator(Node, DB, Shard, OldIter, DSKey) ->
         DB, Shard, OldIter, DSKey
     ]).
 
+-spec add_generation([node()], emqx_ds:db()) ->
+    [{ok, ok} | erpc:caught_call_exception()].
+add_generation(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
+
 %%================================================================================
 %% behavior callbacks
 %%================================================================================

+ 4 - 3
apps/emqx_durable_storage/test/emqx_ds_SUITE.erl

@@ -155,7 +155,7 @@ t_05_update_iterator(_Config) ->
     ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}),
     ok.
 
-t_05_update(_Config) ->
+t_05_update_config(_Config) ->
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts())),
     TopicFilter = ['#'],
@@ -180,7 +180,7 @@ t_05_update(_Config) ->
                     {false, TimeAcc, [Msgs | MsgAcc]};
                 (Datas, {Any, TimeAcc, MsgAcc}) ->
                     timer:sleep(500),
-                    ?assertMatch(ok, emqx_ds:update_db_config(DB, opts())),
+                    ?assertMatch(ok, emqx_ds:add_generation(DB, opts())),
                     timer:sleep(500),
                     StartTime = emqx_message:timestamp_now(),
                     Msgs = ToMsgs(Datas),
@@ -269,7 +269,8 @@ fetch_all(DB, TopicFilter, StartTime) ->
     lists:foldl(
         fun({_, Stream}, Acc) ->
             {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
-            {ok, _, Msgs} = iterate(DB, Iter0, StartTime),
+            {ok, _, Msgs0} = iterate(DB, Iter0, StartTime),
+            Msgs = lists:map(fun({_, Msg}) -> Msg end, Msgs0),
             Acc ++ Msgs
         end,
         [],