Przeglądaj źródła

Merge pull request #9060 from lafirest/fix/delayed_pub

fix(delayed): Improve time precision of delayed messages
lafirest 3 lat temu
rodzic
commit
d600c870af

+ 1 - 0
CHANGES-5.0.md

@@ -3,6 +3,7 @@
 ## Enhancements
 
 * Add `cert_common_name` and `cert_subject` placeholder support for authz_http and authz_mongo.[#8973](https://github.com/emqx/emqx/pull/8973)
+* Use milliseconds internally in emqx_delayed to store the publish time, improving precision.[#9060](https://github.com/emqx/emqx/pull/9060)
 
 ## Bug fixes
 

+ 12 - 10
apps/emqx_modules/src/emqx_delayed.erl

@@ -91,6 +91,7 @@
 -define(SERVER, ?MODULE).
 -define(MAX_INTERVAL, 4294967).
 -define(FORMAT_FUN, {?MODULE, format_delayed}).
+-define(NOW, erlang:system_time(milli_seconds)).
 
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
@@ -118,12 +119,13 @@ on_message_publish(
     {PubAt, Delayed} =
         case binary_to_integer(Delay) of
             Interval when Interval < ?MAX_INTERVAL ->
-                {Interval + erlang:round(Ts / 1000), Interval};
+                {Interval * 1000 + Ts, Interval};
             Timestamp ->
                 %% Check malicious timestamp?
-                case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of
+                Internal = Timestamp - erlang:round(Ts / 1000),
+                case Internal > ?MAX_INTERVAL of
                     true -> error(invalid_delayed_timestamp);
-                    false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)}
+                    false -> {Timestamp * 1000, Internal}
                 end
         end,
     PubMsg = Msg#message{topic = Topic1},
@@ -189,14 +191,14 @@ format_delayed(
     WithPayload
 ) ->
     PublishTime = to_rfc3339(PublishTimeStamp div 1000),
-    ExpectTime = to_rfc3339(ExpectTimeStamp),
-    RemainingTime = ExpectTimeStamp - erlang:system_time(second),
+    ExpectTime = to_rfc3339(ExpectTimeStamp div 1000),
+    RemainingTime = ExpectTimeStamp - ?NOW,
     Result = #{
         msgid => emqx_guid:to_hexstr(Id),
         node => node(),
         publish_at => PublishTime,
         delayed_interval => Delayed,
-        delayed_remaining => RemainingTime,
+        delayed_remaining => RemainingTime div 1000,
         expected_at => ExpectTime,
         topic => Topic,
         qos => Qos,
@@ -296,7 +298,7 @@ handle_cast(Msg, State) ->
 
 %% Do Publish...
 handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) ->
-    DeletedKeys = do_publish(mnesia:dirty_first(?TAB), erlang:system_time(seconds)),
+    DeletedKeys = do_publish(mnesia:dirty_first(?TAB), ?NOW),
     lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
     {noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})};
 handle_info(stats, State = #{stats_fun := StatsFun}) ->
@@ -347,18 +349,18 @@ ensure_publish_timer(State) ->
 ensure_publish_timer('$end_of_table', State) ->
     State#{publish_timer := undefined, publish_at := 0};
 ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) ->
-    ensure_publish_timer(Ts, erlang:system_time(seconds), State);
+    ensure_publish_timer(Ts, ?NOW, State);
 ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when
     Ts < PubAt
 ->
     ok = emqx_misc:cancel_timer(TRef),
-    ensure_publish_timer(Ts, erlang:system_time(seconds), State);
+    ensure_publish_timer(Ts, ?NOW, State);
 ensure_publish_timer(_Key, State) ->
     State.
 
 ensure_publish_timer(Ts, Now, State) ->
     Interval = max(1, Ts - Now),
-    TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish),
+    TRef = emqx_misc:start_timer(Interval, do_publish),
     State#{publish_timer := TRef, publish_at := Now + Interval}.
 
 do_publish(Key, Now) ->

+ 1 - 1
apps/emqx_modules/src/emqx_modules.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_modules, [
     {description, "EMQX Modules"},
-    {vsn, "5.0.4"},
+    {vsn, "5.0.5"},
     {modules, []},
     {applications, [kernel, stdlib, emqx]},
     {mod, {emqx_modules_app, []}},

+ 34 - 0
apps/emqx_modules/test/emqx_delayed_SUITE.erl

@@ -202,3 +202,37 @@ t_get_basic_usage_info(_Config) ->
     ),
     ?assertEqual(#{delayed_message_count => 4}, emqx_delayed:get_basic_usage_info()),
     ok.
+
+t_delayed_precision(_) ->
+    MaxSpan = 1250,
+    FutureDiff = subscribe_proc(),
+    DelayedMsg0 = emqx_message:make(
+        ?MODULE, 1, <<"$delayed/1/delayed/test">>, <<"delayed/test">>
+    ),
+    _ = on_message_publish(DelayedMsg0),
+    ?assert(FutureDiff() =< MaxSpan).
+
+subscribe_proc() ->
+    Self = self(),
+    Ref = erlang:make_ref(),
+    erlang:spawn(fun() ->
+        Topic = <<"delayed/+">>,
+        emqx_broker:subscribe(Topic),
+        Self !
+            {Ref,
+                receive
+                    {deliver, Topic, Msg} ->
+                        erlang:system_time(milli_seconds) - Msg#message.timestamp
+                after 2000 ->
+                    2000
+                end},
+        emqx_broker:unsubscribe(Topic)
+    end),
+    fun() ->
+        receive
+            {Ref, Diff} ->
+                Diff
+        after 2000 ->
+            2000
+        end
+    end.