Browse Source

fix(ds): rename force_monotonic_timestamps to append_only

ieQu1 1 year ago
parent
commit
07dd736ac1

+ 1 - 1
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -332,7 +332,7 @@
 open_db(Config) ->
 open_db(Config) ->
     emqx_ds:open_db(?DB, Config#{
     emqx_ds:open_db(?DB, Config#{
         atomic_batches => true,
         atomic_batches => true,
-        force_monotonic_timestamps => false
+        append_only => false
     }).
     }).
 %% ELSE ifdef(STORE_STATE_IN_DS).
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
 -else.

+ 1 - 1
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -431,7 +431,7 @@ init() ->
     application:set_env(emqx_durable_storage, db_data_dir, Dir),
     application:set_env(emqx_durable_storage, db_data_dir, Dir),
     Defaults = #{
     Defaults = #{
         backend => builtin_local,
         backend => builtin_local,
-        force_monotonic_timestamps => false,
+        append_only => false,
         atomic_batches => true,
         atomic_batches => true,
         storage =>
         storage =>
             {emqx_ds_storage_bitfield_lts, #{
             {emqx_ds_storage_bitfield_lts, #{

+ 2 - 2
apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl

@@ -366,7 +366,7 @@ t_11_batch_preconditions(Config) ->
         begin
         begin
             DBOpts = (opts(Config))#{
             DBOpts = (opts(Config))#{
                 atomic_batches => true,
                 atomic_batches => true,
-                force_monotonic_timestamps => false
+                append_only => false
             },
             },
             ?assertMatch(ok, emqx_ds_open_db(DB, DBOpts)),
             ?assertMatch(ok, emqx_ds_open_db(DB, DBOpts)),
 
 
@@ -423,7 +423,7 @@ t_12_batch_precondition_conflicts(Config) ->
         begin
         begin
             DBOpts = (opts(Config))#{
             DBOpts = (opts(Config))#{
                 atomic_batches => true,
                 atomic_batches => true,
-                force_monotonic_timestamps => false
+                append_only => false
             },
             },
             ?assertMatch(ok, emqx_ds_open_db(DB, DBOpts)),
             ?assertMatch(ok, emqx_ds_open_db(DB, DBOpts)),
 
 

+ 7 - 4
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -102,9 +102,9 @@
         storage := emqx_ds_storage_layer:prototype(),
         storage := emqx_ds_storage_layer:prototype(),
         n_shards := pos_integer(),
         n_shards := pos_integer(),
         poll_workers_per_shard => pos_integer(),
         poll_workers_per_shard => pos_integer(),
-        %% Inherited from `emqx_ds:generic_db_opts()`.
-        force_monotonic_timestamps => boolean(),
-        atomic_batches => boolean()
+        %% Equivalent to `append_only' from `emqx_ds:create_db_opts':
+        force_monotonic_timestamps := boolean(),
+        atomic_batches := boolean()
     }.
     }.
 
 
 -type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
 -type generation_rank() :: {shard(), emqx_ds_storage_layer:gen_id()}.
@@ -121,7 +121,10 @@
 %%================================================================================
 %%================================================================================
 
 
 -spec open_db(emqx_ds:db(), db_opts()) -> ok | {error, _}.
 -spec open_db(emqx_ds:db(), db_opts()) -> ok | {error, _}.
-open_db(DB, CreateOpts) ->
+open_db(DB, CreateOpts0) ->
+    %% Rename `append_only' flag to `force_monotonic_timestamps':
+    {AppendOnly, CreateOpts1} = maps:take(append_only, CreateOpts0),
+    CreateOpts = maps:put(force_monotonic_timestamps, AppendOnly, CreateOpts1),
     case emqx_ds_builtin_local_sup:start_db(DB, CreateOpts) of
     case emqx_ds_builtin_local_sup:start_db(DB, CreateOpts) of
         {ok, _} ->
         {ok, _} ->
             ok;
             ok;

+ 5 - 2
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -116,7 +116,7 @@
         n_sites => pos_integer(),
         n_sites => pos_integer(),
         replication_factor => pos_integer(),
         replication_factor => pos_integer(),
         replication_options => _TODO :: #{},
         replication_options => _TODO :: #{},
-        %% Inherited from `emqx_ds:generic_db_opts()`.
+        %% Equivalent to `append_only' from `emqx_ds:create_db_opts'
         force_monotonic_timestamps => boolean(),
         force_monotonic_timestamps => boolean(),
         atomic_batches => boolean()
         atomic_batches => boolean()
     }.
     }.
@@ -193,7 +193,10 @@ list_shards(DB) ->
     emqx_ds_replication_layer_meta:shards(DB).
     emqx_ds_replication_layer_meta:shards(DB).
 
 
 -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
 -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
-open_db(DB, CreateOpts) ->
+open_db(DB, CreateOpts0) ->
+    %% Rename `append_only' flag to `force_monotonic_timestamps':
+    {AppendOnly, CreateOpts1} = maps:take(append_only, CreateOpts0),
+    CreateOpts = maps:put(force_monotonic_timestamps, AppendOnly, CreateOpts1),
     case emqx_ds_builtin_raft_sup:start_db(DB, CreateOpts) of
     case emqx_ds_builtin_raft_sup:start_db(DB, CreateOpts) of
         {ok, _} ->
         {ok, _} ->
             ok;
             ok;

+ 1 - 1
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_store.erl

@@ -113,7 +113,7 @@ db_config() ->
 tune_db_config(Config0 = #{backend := Backend}) ->
 tune_db_config(Config0 = #{backend := Backend}) ->
     Config = Config0#{
     Config = Config0#{
         %% We need total control over timestamp assignment.
         %% We need total control over timestamp assignment.
-        force_monotonic_timestamps => false
+        append_only => false
     },
     },
     case Backend of
     case Backend of
         B when B == builtin_raft; B == builtin_local ->
         B when B == builtin_raft; B == builtin_local ->

+ 44 - 12
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -200,28 +200,60 @@
         sync => boolean()
         sync => boolean()
     }.
     }.
 
 
--type generic_db_opts() ::
+%% This type specifies the backend and some generic options that
+%% affect the semantics of DS operations.
+%%
+%% All backends MUST handle all options listed here; even if it means
+%% throwing an exception that says that certain option is not
+%% supported.
+%%
+%% How to add a new option:
+%%
+%% 1. Create a PR modifying this type and adding a stub implementation
+%% to all backends that throws "option unsupported" error.
+%%
+%% 2. Get everyone fully on-board about the semantics and name of the
+%% new option. Merge the PR.
+%%
+%% 3. Implement business logic reliant on the new option, and its
+%% support in the backends.
+%%
+%% 4. If the new option is not supported by all backends, it's up to
+%% the business logic to choose the right one.
+%%
+%% 5. Default value for the option MUST be set in `emqx_ds:open_db'
+%% function. The backends SHOULD NOT make any assumptions about the
+%% default values for common options.
+-type create_db_opts() ::
     #{
     #{
         backend := atom(),
         backend := atom(),
-        %% Force strictly monotonic message timestamps.
-        %% Default: `true'.
-        %% Messages are assigned unique, strictly monotonically increasing timestamps.
-        %% Those timestamps form a total order per each serialization key.
-        %% If `false' then message timestamps are respected; timestamp, topic and
-        %% serialization key uniquely identify a message.
-        force_monotonic_timestamps => boolean(),
+        %% `append_only' option ensures that the backend will take
+        %% measures to avoid overwriting messages, even if their
+        %% fields (topic, timestamp, GUID and client ID, ... or any
+        %% combination of thereof) match. This option is `true' by
+        %% default.
+        %%
+        %% When this flag is `false':
+        %%
+        %% - Messages published by the same client WILL be overwritten
+        %% if thier topic and timestamp match.
+        %%
+        %% - Messages published with the same topic and timestamp by
+        %% different clients MAY be overwritten.
+        %%
+        %% The API consumer must design the topic structure
+        %% accordingly.
+        append_only => boolean(),
         %% Whether the whole batch given to `store_batch' should be processed and
         %% Whether the whole batch given to `store_batch' should be processed and
         %% inserted atomically as a unit, in isolation from other batches.
         %% inserted atomically as a unit, in isolation from other batches.
         %% Default: `false'.
         %% Default: `false'.
         %% The whole batch must be crafted so that it belongs to a single shard (if
         %% The whole batch must be crafted so that it belongs to a single shard (if
         %% applicable to the backend).
         %% applicable to the backend).
         atomic_batches => boolean(),
         atomic_batches => boolean(),
-        serialize_by => clientid | topic,
+        %% Backend-specific options:
         _ => _
         _ => _
     }.
     }.
 
 
--type create_db_opts() :: generic_db_opts().
-
 -type poll_opts() ::
 -type poll_opts() ::
     #{
     #{
         %% Expire poll request after this timeout
         %% Expire poll request after this timeout
@@ -500,7 +532,7 @@ timestamp_us() ->
 
 
 set_db_defaults(Opts) ->
 set_db_defaults(Opts) ->
     Defaults = #{
     Defaults = #{
-        force_monotonic_timestamps => true,
+        append_only => true,
         atomic_batches => false
         atomic_batches => false
     },
     },
     maps:merge(Defaults, Opts).
     maps:merge(Defaults, Opts).

+ 1 - 1
apps/emqx_durable_storage/test/emqx_ds_storage_layout_SUITE.erl

@@ -74,7 +74,7 @@ t_store(_Config) ->
 
 
 %% Smoke test of applying batch operations
 %% Smoke test of applying batch operations
 t_operations(db_config, _Config) ->
 t_operations(db_config, _Config) ->
-    #{force_monotonic_timestamps => false}.
+    #{append_only => false}.
 
 
 t_operations(_Config) ->
 t_operations(_Config) ->
     Batch1 = [
     Batch1 = [