Просмотр исходного кода

Merge pull request #11914 from ieQu1/dev/speed-up-poll

fix(ds): Speed up polling for the new messages
ieQu1 2 лет назад
Родитель
Сommit
42a9f0eb6b

+ 4 - 0
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -28,6 +28,10 @@
 
 -include("emqx_persistent_session_ds.hrl").
 
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
 %%================================================================================
 %% Type declarations
 %%================================================================================

+ 14 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -344,7 +344,15 @@ deliver(_ClientInfo, _Delivers, Session) ->
 handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) ->
     WindowSize = 100,
     {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize),
-    ensure_timer(pull),
+    %% TODO: make these values configurable:
+    Timeout =
+        case Publishes of
+            [] ->
+                100;
+            [_ | _] ->
+                0
+        end,
+    ensure_timer(pull, Timeout),
     {ok, Publishes, Session#{inflight => Inflight}};
 handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) ->
     renew_streams(Id),
@@ -714,5 +722,9 @@ ensure_timers() ->
 
 -spec ensure_timer(pull | get_streams) -> ok.
 ensure_timer(Type) ->
-    _ = emqx_utils:start_timer(100, {emqx_session, Type}),
+    ensure_timer(Type, 100).
+
+-spec ensure_timer(pull | get_streams, non_neg_integer()) -> ok.
+ensure_timer(Type, Timeout) ->
+    _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
     ok.