Просмотр исходного кода

Merge pull request #12070 from ieQu1/dev/ds-min-max-batch-size

Various minor fixes
ieQu1 2 лет назад
Родитель
Сommit
476c300ecf

+ 2 - 1
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -169,7 +169,8 @@ commit_offset(
 -spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) ->
     {emqx_session:replies(), inflight()}.
 poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE ->
-    FetchThreshold = max(1, WindowSize div 2),
+    MinBatchSize = emqx_config:get([session_persistence, min_batch_size]),
+    FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)),
     FreeSpace = WindowSize - n_inflight(Inflight0),
     case FreeSpace >= FetchThreshold of
         false ->

+ 35 - 36
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -96,6 +96,12 @@
     props := map(),
     extra := map()
 }.
+
+-define(TIMER_PULL, timer_pull).
+-define(TIMER_GET_STREAMS, timer_get_streams).
+-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
+-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT.
+
 -type session() :: #{
     %% Client ID
     id := id(),
@@ -111,6 +117,8 @@
     receive_maximum := pos_integer(),
     %% Connection Info
     conninfo := emqx_types:conninfo(),
+    %% Timers
+    timer() => reference(),
     %%
     props := map()
 }.
@@ -120,7 +128,6 @@
 -type clientinfo() :: emqx_types:clientinfo().
 -type conninfo() :: emqx_session:conninfo().
 -type replies() :: emqx_session:replies().
--type timer() :: pull | get_streams | bump_last_alive_at.
 
 -define(STATS_KEYS, [
     subscriptions_cnt,
@@ -144,8 +151,7 @@
     session().
 create(#{clientid := ClientID}, ConnInfo, Conf) ->
     % TODO: expiration
-    ensure_timers(),
-    ensure_session(ClientID, ConnInfo, Conf).
+    ensure_timers(ensure_session(ClientID, ConnInfo, Conf)).
 
 -spec open(clientinfo(), conninfo()) ->
     {_IsPresent :: true, session(), []} | false.
@@ -159,10 +165,9 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
     ok = emqx_cm:discard_session(ClientID),
     case session_open(ClientID, ConnInfo) of
         Session0 = #{} ->
-            ensure_timers(),
             ReceiveMaximum = receive_maximum(ConnInfo),
             Session = Session0#{receive_maximum => ReceiveMaximum},
-            {true, Session, []};
+            {true, ensure_timers(Session), []};
         false ->
             false
     end.
@@ -333,9 +338,9 @@ publish(_PacketId, Msg, Session) ->
 puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) ->
     case emqx_persistent_message_ds_replayer:commit_offset(Id, ack, PacketId, Inflight0) of
         {true, Inflight} ->
-            %% TODO
+            %% TODO: we pass a bogus message into the hook:
             Msg = emqx_message:make(Id, <<>>, <<>>),
-            {ok, Msg, [], Session#{inflight => Inflight}};
+            {ok, Msg, [], pull_now(Session#{inflight => Inflight})};
         {false, _} ->
             %% Invalid Packet Id
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@@ -351,9 +356,9 @@ puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) ->
 pubrec(PacketId, Session = #{id := Id, inflight := Inflight0}) ->
     case emqx_persistent_message_ds_replayer:commit_offset(Id, rec, PacketId, Inflight0) of
         {true, Inflight} ->
-            %% TODO
+            %% TODO: we pass a bogus message into the hook:
             Msg = emqx_message:make(Id, <<>>, <<>>),
-            {ok, Msg, Session#{inflight => Inflight}};
+            {ok, Msg, pull_now(Session#{inflight => Inflight})};
         {false, _} ->
             %% Invalid Packet Id
             {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@@ -399,9 +404,11 @@ deliver(_ClientInfo, _Delivers, Session) ->
     {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
 handle_timeout(
     _ClientInfo,
-    pull,
-    Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum}
+    ?TIMER_PULL,
+    Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum}
 ) ->
+    MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]),
+    BatchSize = min(ReceiveMaximum, MaxBatchSize),
     {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(
         fun
             (_Seqno, Message = #message{qos = ?QOS_0}) ->
@@ -412,7 +419,7 @@ handle_timeout(
         end,
         Id,
         Inflight0,
-        ReceiveMaximum
+        BatchSize
     ),
     IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]),
     Timeout =
@@ -422,13 +429,12 @@ handle_timeout(
             [_ | _] ->
                 0
         end,
-    ensure_timer(pull, Timeout),
-    {ok, Publishes, Session#{inflight := Inflight}};
-handle_timeout(_ClientInfo, get_streams, Session) ->
+    Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session0#{inflight := Inflight}),
+    {ok, Publishes, Session};
+handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) ->
     renew_streams(Session),
-    ensure_timer(get_streams),
-    {ok, [], Session};
-handle_timeout(_ClientInfo, bump_last_alive_at, Session0) ->
+    {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)};
+handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
     %% Note: we take a pessimistic approach here and assume that the client will be alive
     %% until the next bump timeout.  With this, we avoid garbage collecting this session
     %% too early in case the session/connection/node crashes earlier without having time
@@ -436,8 +442,8 @@ handle_timeout(_ClientInfo, bump_last_alive_at, Session0) ->
     BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
     EstimatedLastAliveAt = now_ms() + BumpInterval,
     Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt),
-    ensure_timer(bump_last_alive_at),
-    {ok, [], Session}.
+    BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
+    {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}.
 
 -spec replay(clientinfo(), [], session()) ->
     {ok, replies(), session()}.
@@ -957,22 +963,15 @@ export_record(_, _, [], Acc) ->
 
 %% TODO: find a more reliable way to perform actions that have side
 %% effects. Add `CBM:init' callback to the session behavior?
-ensure_timers() ->
-    ensure_timer(pull),
-    ensure_timer(get_streams),
-    ensure_timer(bump_last_alive_at).
-
--spec ensure_timer(timer()) -> ok.
-ensure_timer(bump_last_alive_at = Type) ->
-    BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
-    ensure_timer(Type, BumpInterval);
-ensure_timer(Type) ->
-    ensure_timer(Type, 100).
-
--spec ensure_timer(timer(), non_neg_integer()) -> ok.
-ensure_timer(Type, Timeout) ->
-    _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
-    ok.
+-spec ensure_timers(session()) -> session().
+ensure_timers(Session0) ->
+    Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0),
+    Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
+    emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
+
+-spec pull_now(session()) -> session().
+pull_now(Session) ->
+    emqx_session:reset_timer(?TIMER_PULL, 0, Session).
 
 -spec receive_maximum(conninfo()) -> pos_integer().
 receive_maximum(ConnInfo) ->

+ 16 - 0
apps/emqx/src/emqx_schema.erl

@@ -1773,6 +1773,22 @@ fields("session_persistence") ->
                     }
                 }
             )},
+        {"max_batch_size",
+            sc(
+                pos_integer(),
+                #{
+                    default => 1000,
+                    desc => ?DESC(session_ds_max_batch_size)
+                }
+            )},
+        {"min_batch_size",
+            sc(
+                pos_integer(),
+                #{
+                    default => 100,
+                    desc => ?DESC(session_ds_min_batch_size)
+                }
+            )},
         {"idle_poll_interval",
             sc(
                 timeout_duration(),

+ 15 - 20
apps/emqx/src/emqx_session.erl

@@ -111,8 +111,7 @@
     reply/0,
     replies/0,
     common_timer_name/0,
-    custom_timer_name/0,
-    timerset/0
+    custom_timer_name/0
 ]).
 
 -type session_id() :: _TODO.
@@ -154,8 +153,6 @@
     emqx_session_mem:session()
     | emqx_persistent_session_ds:session().
 
--type timerset() :: #{custom_timer_name() => _TimerRef :: reference()}.
-
 -define(INFO_KEYS, [
     id,
     created_at,
@@ -477,28 +474,26 @@ handle_timeout(ClientInfo, Timer, Session) ->
 
 %%--------------------------------------------------------------------
 
--spec ensure_timer(custom_timer_name(), timeout(), timerset()) ->
-    timerset().
-ensure_timer(Name, _Time, Timers = #{}) when is_map_key(Name, Timers) ->
-    Timers;
-ensure_timer(Name, Time, Timers = #{}) when Time > 0 ->
+-spec ensure_timer(custom_timer_name(), timeout(), map()) ->
+    map().
+ensure_timer(Name, Time, Timers = #{}) when Time >= 0 ->
     TRef = emqx_utils:start_timer(Time, {?MODULE, Name}),
     Timers#{Name => TRef}.
 
--spec reset_timer(custom_timer_name(), timeout(), timerset()) ->
-    timerset().
-reset_timer(Name, Time, Channel) ->
-    ensure_timer(Name, Time, cancel_timer(Name, Channel)).
+-spec reset_timer(custom_timer_name(), timeout(), map()) ->
+    map().
+reset_timer(Name, Time, Timers) ->
+    ensure_timer(Name, Time, cancel_timer(Name, Timers)).
 
--spec cancel_timer(custom_timer_name(), timerset()) ->
-    timerset().
-cancel_timer(Name, Timers) ->
-    case maps:take(Name, Timers) of
-        {TRef, NTimers} ->
+-spec cancel_timer(custom_timer_name(), map()) ->
+    map().
+cancel_timer(Name, Timers0) ->
+    case maps:take(Name, Timers0) of
+        {TRef, Timers} ->
             ok = emqx_utils:cancel_timer(TRef),
-            NTimers;
+            Timers;
         error ->
-            Timers
+            Timers0
     end.
 
 %%--------------------------------------------------------------------

+ 7 - 6
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -145,15 +145,16 @@ assert_messages_missed(Ls1, Ls2) ->
 
 assert_messages_order([], []) ->
     ok;
-assert_messages_order([Msg | Ls1], [#{payload := No} | Ls2]) ->
-    case emqx_message:payload(Msg) == No of
-        false ->
+assert_messages_order([Msg | Expected], Received) ->
+    %% Account for duplicate messages:
+    case lists:splitwith(fun(#{payload := P}) -> emqx_message:payload(Msg) == P end, Received) of
+        {[], [#{payload := Mismatch} | _]} ->
             ct:fail("Message order is not correct, expected: ~p, received: ~p", [
-                emqx_message:payload(Msg), No
+                emqx_message:payload(Msg), Mismatch
             ]),
             error;
-        true ->
-            assert_messages_order(Ls1, Ls2)
+        {_Matching, Rest} ->
+            assert_messages_order(Expected, Rest)
     end.
 
 messages(Offset, Cnt) ->

+ 28 - 14
apps/emqx_durable_storage/README.md

@@ -1,36 +1,50 @@
 # EMQX Replay
 
-`emqx_ds` is a generic durable storage for MQTT messages within EMQX.
+`emqx_ds` is an application implementing durable storage for MQTT messages within EMQX.
 
-Concepts:
+# Features
 
+- Streams. Stream is an abstraction that encompasses topics, shards, different data layouts, etc.
+  The client application must only aware of the streams.
 
+- Batching. All the API functions are batch-oriented.
 
-> 0. App overview introduction
-> 1. let people know what your project can do specifically. Is it a base
-> library dependency, or what kind of functionality is provided to the user?
-> 2. Provide context and add a link to any reference visitors might be
-> unfamiliar with.
-> 3. Design details, implementation technology architecture, Roadmap, etc.
+- Iterators. Iterators can be stored durably or transferred over network.
+  They take relatively small space.
 
-# [Features] - [Optional]
-> A List of features your application provided. If the feature is quite simple, just
-> list in the previous section.
+- Support for various backends. Almost any DBMS that supports range
+  queries can serve as a `emqx_durable_storage` backend.
+
+- Builtin backend based on RocksDB.
+  - Changing storage layout on the fly: it's achieved by creating a
+    new set of tables (known as "generation") and the schema.
+  - Sharding based on publisher's client ID
 
 # Limitation
-TBD
+
+- Builtin backend currently doesn't replicate data across different sites
+- There is no local cache of messages, which may result in transferring the same data multiple times
 
 # Documentation links
 TBD
 
 # Usage
-TBD
+
+Currently it's only used to implement persistent sessions.
+
+In the future it can serve as a storage for retained messages or as a generic message buffering layer for the bridges.
 
 # Configurations
-TBD
+
+`emqx_durable_storage` doesn't have any configurable parameters.
+Instead, it relies on the upper-level business applications to create
+a correct configuration and pass it to `emqx_ds:open_db(DBName, Config)`
+function according to its needs.
 
 # HTTP APIs
 
+None
+
 # Other
 TBD
 

+ 4 - 2
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -69,7 +69,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
     | {ok, end_of_stream}
     | {error, _}.
 next(Node, DB, Shard, Iter, BatchSize) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
 
 -spec store_batch(
     node(),
@@ -80,7 +80,9 @@ next(Node, DB, Shard, Iter, BatchSize) ->
 ) ->
     emqx_ds:store_batch_result().
 store_batch(Node, DB, Shard, Batch, Options) ->
-    erpc:call(Node, emqx_ds_replication_layer, do_store_batch_v1, [DB, Shard, Batch, Options]).
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
+        DB, Shard, Batch, Options
+    ]).
 
 %%================================================================================
 %% behavior callbacks

+ 12 - 0
rel/i18n/emqx_schema.hocon

@@ -1577,4 +1577,16 @@ session_ds_session_gc_interval.desc:
 session_ds_session_gc_batch_size.desc:
 """The size of each batch of expired persistent sessions to be garbage collected per iteration."""
 
+session_ds_max_batch_size.desc:
+"""This value affects the flow control for the persistent sessions.
+The session queries the DB for the new messages in batches.
+Size of the batch doesn't exceed this value or `ReceiveMaximum`, whichever is smaller."""
+
+session_ds_min_batch_size.desc:
+"""This value affects the flow control for the persistent sessions.
+The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller.
+
+`FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
+
+
 }