فهرست منبع

feat: add time-based pull for safety

Thales Macedo Garitezi 2 سال پیش
والد
کامیت
ba96c725e5
1فایلهای تغییر یافته به همراه15 افزوده شده و 1 حذف شده
  1. 15 1
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

+ 15 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -51,6 +51,7 @@
     mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
     pending_acks => [ack_id()],
     pull_max_messages := non_neg_integer(),
+    pull_timer := undefined | reference(),
     subscription_id => subscription_id(),
     topic => emqx_bridge_gcp_pubsub_connector:topic()
 }.
@@ -58,6 +59,7 @@
 
 -define(HEALTH_CHECK_TIMEOUT, 10_000).
 -define(OPTVAR_SUB_OK(PID), {?MODULE, PID}).
+-define(PULL_INTERVAL, 5_000).
 
 %%-------------------------------------------------------------------------------------------------
 %% API used by `reply_delegator'
@@ -159,7 +161,8 @@ init(Config) ->
     State = Config#{
         ack_timer => undefined,
         async_workers => #{},
-        pending_acks => []
+        pending_acks => [],
+        pull_timer => undefined
     },
     {ok, State, {continue, ensure_subscription}}.
 
@@ -202,6 +205,11 @@ handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) ->
     State1 = acknowledge(State0),
     State = ensure_ack_timer(State1),
     {noreply, State};
+handle_info({timeout, TRef, pull}, State0 = #{pull_timer := TRef}) ->
+    State1 = State0#{pull_timer := undefined},
+    State2 = do_pull_async(State1),
+    State = ensure_pull_timer(State2),
+    {noreply, State};
 handle_info(
     {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0}
 ) when
@@ -244,6 +252,12 @@ ensure_ack_timer(State = #{ack_timer := TRef}) when is_reference(TRef) ->
 ensure_ack_timer(State = #{ack_retry_interval := AckRetryInterval}) ->
     State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}.
 
+-spec ensure_pull_timer(state()) -> state().
+ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) ->
+    State;
+ensure_pull_timer(State) ->
+    State#{pull_timer := emqx_utils:start_timer(?PULL_INTERVAL, pull)}.
+
 -spec ensure_subscription_exists(state()) -> ok | error.
 ensure_subscription_exists(State) ->
     #{