Преглед изворни кода

test(gcp_pubsub_consumer): add more tests and improve bridge

Fixes https://emqx.atlassian.net/browse/EMQX-10309
Thales Macedo Garitezi пре 2 година
родитељ
комит
30e0b4be54

+ 5 - 1
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -417,10 +417,14 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
     ok.
 
 t_on_get_status(Config) ->
+    t_on_get_status(Config, _Opts = #{}).
+
+t_on_get_status(Config, Opts) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
     ResourceId = resource_id(Config),
+    FailureStatus = maps:get(failure_status, Opts, disconnected),
     ?assertMatch({ok, _}, create_bridge(Config)),
     %% Since the connection process is async, we give it some time to
     %% stabilize and avoid flakiness.
@@ -431,7 +435,7 @@ t_on_get_status(Config) ->
     ),
     emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         ct:sleep(500),
-        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+        ?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId))
     end),
     %% Check that it recovers itself.
     ?retry(

+ 9 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl

@@ -139,6 +139,15 @@ fields(producer) ->
     ];
 fields(consumer) ->
     [
+        %% Note: The minimum deadline pubsub does is 10 s.
+        {ack_deadline,
+            mk(
+                emqx_schema:timeout_duration_s(),
+                #{
+                    default => <<"60s">>,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
         {ack_retry_interval,
             mk(
                 emqx_schema:timeout_duration_ms(),

+ 18 - 11
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl

@@ -119,6 +119,7 @@ start(
     ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => ResourceId}),
     case ehttpc_sup:start_pool(ResourceId, PoolOpts) of
         {ok, _} ->
+            ?tp(gcp_pubsub_ehttpc_pool_started, #{pool_name => ResourceId}),
             {ok, State};
         {error, {already_started, _}} ->
             ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => ResourceId}),
@@ -166,7 +167,7 @@ query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State)
     {prepared_request, prepared_request()},
     {ReplyFun :: function(), Args :: list()},
     state()
-) -> {ok, pid()}.
+) -> {ok, pid()} | {error, no_pool_worker_available}.
 query_async(
     {prepared_request, PreparedRequest = {_Method, _Path, _Body}},
     ReplyFunAndArgs,
@@ -306,7 +307,7 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) ->
     state(),
     {prepared_request, prepared_request()},
     {ReplyFun :: function(), Args :: list()}
-) -> {ok, pid()}.
+) -> {ok, pid()} | {error, no_pool_worker_available}.
 do_send_requests_async(
     State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs
 ) ->
@@ -323,21 +324,27 @@ do_send_requests_async(
         }
     ),
     Request = to_ehttpc_request(State, Method, Path, Body),
-    Worker = ehttpc_pool:pick_worker(PoolName),
-    ok = ehttpc:request_async(
-        Worker,
-        Method,
-        Request,
-        RequestTTL,
-        {fun ?MODULE:reply_delegator/3, [PoolName, ReplyFunAndArgs]}
-    ),
-    {ok, Worker}.
+    %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers...
+    case ehttpc_pool:pick_worker(PoolName) of
+        false ->
+            {error, no_pool_worker_available};
+        Worker ->
+            ok = ehttpc:request_async(
+                Worker,
+                Method,
+                Request,
+                RequestTTL,
+                {fun ?MODULE:reply_delegator/3, [PoolName, ReplyFunAndArgs]}
+            ),
+            {ok, Worker}
+    end.
 
 to_ehttpc_request(State, Method, Path, Body) ->
     #{jwt_config := JWTConfig} = State,
     Headers = get_jwt_authorization_header(JWTConfig),
     case {Method, Body} of
         {get, <<>>} -> {Path, Headers};
+        {delete, <<>>} -> {Path, Headers};
         _ -> {Path, Headers, Body}
     end.
 

+ 325 - 146
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl

@@ -24,44 +24,58 @@
 ]).
 
 -export([get_subscription/1]).
--export([reply_delegator/3, pull_async/1, process_pull_response/2, ensure_subscription/1]).
+-export([reply_delegator/4, pull_async/1, process_pull_response/2, ensure_subscription/1]).
 
 -type subscription_id() :: binary().
 -type bridge_name() :: atom() | binary().
 -type ack_id() :: binary().
+-type message_id() :: binary().
 -type config() :: #{
+    ack_deadline := emqx_schema:timeout_duration_s(),
     ack_retry_interval := emqx_schema:timeout_duration_ms(),
     client := emqx_bridge_gcp_pubsub_client:state(),
     ecpool_worker_id => non_neg_integer(),
+    forget_interval := timer:time(),
     hookpoint := binary(),
     instance_id := binary(),
     mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
     project_id := emqx_bridge_gcp_pubsub_client:project_id(),
     pull_max_messages := non_neg_integer(),
+    pull_retry_interval := emqx_schema:timeout_duration_ms(),
     subscription_id => subscription_id(),
     topic => emqx_bridge_gcp_pubsub_client:topic()
 }.
 -type state() :: #{
+    ack_deadline := emqx_schema:timeout_duration_s(),
     ack_retry_interval := emqx_schema:timeout_duration_ms(),
     ack_timer := undefined | reference(),
     async_workers := #{pid() => reference()},
     client := emqx_bridge_gcp_pubsub_client:state(),
     ecpool_worker_id := non_neg_integer(),
+    forget_interval := timer:time(),
     hookpoint := binary(),
     instance_id := binary(),
-    mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
-    pending_acks => [ack_id()],
+    mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
+    pending_acks := #{message_id() => ack_id()},
     project_id := emqx_bridge_gcp_pubsub_client:project_id(),
     pull_max_messages := non_neg_integer(),
+    pull_retry_interval := emqx_schema:timeout_duration_ms(),
     pull_timer := undefined | reference(),
-    subscription_id => subscription_id(),
-    topic => emqx_bridge_gcp_pubsub_client:topic()
+    %% In order to avoid re-processing the same message twice due to race conditions
+    %% between acknlowledging a message and receiving a duplicate pulled message, we need
+    %% to keep the seen message IDs for a while...
+    seen_message_ids := sets:set(message_id()),
+    subscription_id := subscription_id(),
+    topic := emqx_bridge_gcp_pubsub_client:topic()
 }.
 -type decoded_message() :: map().
 
+%% initialization states
+-define(ensure_subscription, ensure_subscription).
+-define(patch_subscription, patch_subscription).
+
 -define(HEALTH_CHECK_TIMEOUT, 10_000).
--define(OPTVAR_SUB_OK(PID), {?MODULE, PID}).
--define(PULL_INTERVAL, 5_000).
+-define(OPTVAR_SUB_OK(PID), {?MODULE, subscription_ok, PID}).
 
 %%-------------------------------------------------------------------------------------------------
 %% API used by `reply_delegator'
@@ -79,19 +93,23 @@ process_pull_response(WorkerPid, RespBody) ->
 ensure_subscription(WorkerPid) ->
     gen_server:cast(WorkerPid, ensure_subscription).
 
--spec reply_delegator(pid(), binary(), {ok, map()} | {error, timeout | term()}) -> ok.
-reply_delegator(WorkerPid, InstanceId, Result) ->
+-spec reply_delegator(pid(), pull_async, binary(), {ok, map()} | {error, timeout | term()}) -> ok.
+reply_delegator(WorkerPid, pull_async = _Action, InstanceId, Result) ->
+    ?tp(gcp_pubsub_consumer_worker_reply_delegator, #{result => Result}),
     case Result of
         {error, timeout} ->
             ?MODULE:pull_async(WorkerPid);
         {error, Reason} ->
-            ?SLOG(warning, #{
-                msg => "gcp_pubsub_consumer_worker_pull_error",
-                instance_id => InstanceId,
-                reason => Reason
-            }),
+            ?tp(
+                warning,
+                "gcp_pubsub_consumer_worker_pull_error",
+                #{
+                    instance_id => InstanceId,
+                    reason => Reason
+                }
+            ),
             case Reason of
-                #{status_code := 409} ->
+                #{status_code := 404} ->
                     %% the subscription was not found; deleted?!
                     ?MODULE:ensure_subscription(WorkerPid);
                 _ ->
@@ -113,13 +131,13 @@ get_subscription(WorkerPid) ->
 %% `ecpool' health check
 %%-------------------------------------------------------------------------------------------------
 
--spec health_check(pid()) -> boolean().
+-spec health_check(pid()) -> subscription_ok | topic_not_found | timeout.
 health_check(WorkerPid) ->
     case optvar:read(?OPTVAR_SUB_OK(WorkerPid), ?HEALTH_CHECK_TIMEOUT) of
-        {ok, _} ->
-            true;
+        {ok, Status} ->
+            Status;
         timeout ->
-            false
+            timeout
     end.
 
 %%-------------------------------------------------------------------------------------------------
@@ -129,30 +147,36 @@ health_check(WorkerPid) ->
 connect(Opts0) ->
     Opts = maps:from_list(Opts0),
     #{
+        ack_deadline := AckDeadlineSeconds,
         ack_retry_interval := AckRetryInterval,
         bridge_name := BridgeName,
         client := Client,
         ecpool_worker_id := WorkerId,
+        forget_interval := ForgetInterval,
         hookpoint := Hookpoint,
         instance_id := InstanceId,
         project_id := ProjectId,
         pull_max_messages := PullMaxMessages,
+        pull_retry_interval := PullRetryInterval,
         topic_mapping := TopicMapping
     } = Opts,
     TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)),
     Index = 1 + (WorkerId rem map_size(TopicMapping)),
     {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList),
     Config = #{
+        ack_deadline => AckDeadlineSeconds,
         ack_retry_interval => AckRetryInterval,
         %% Note: the `client' value here must be immutable and not changed by the
         %% bridge during `on_get_status', since we have handed it over to the pull
         %% workers.
         client => Client,
+        forget_interval => ForgetInterval,
         hookpoint => Hookpoint,
         instance_id => InstanceId,
         mqtt_config => MQTTConfig,
         project_id => ProjectId,
         pull_max_messages => PullMaxMessages,
+        pull_retry_interval => PullRetryInterval,
         topic => Topic,
         subscription_id => subscription_id(BridgeName, Topic)
     },
@@ -162,33 +186,55 @@ connect(Opts0) ->
 %% `gen_server' API
 %%-------------------------------------------------------------------------------------------------
 
--spec init(config()) -> {ok, state(), {continue, ensure_subscription}}.
+-spec init(config()) -> {ok, state(), {continue, ?ensure_subscription}}.
 init(Config) ->
     process_flag(trap_exit, true),
     State = Config#{
         ack_timer => undefined,
         async_workers => #{},
-        pending_acks => [],
-        pull_timer => undefined
+        pending_acks => #{},
+        pull_timer => undefined,
+        seen_message_ids => sets:new([{version, 2}])
     },
-    {ok, State, {continue, ensure_subscription}}.
+    ?tp(gcp_pubsub_consumer_worker_init, #{topic => maps:get(topic, State)}),
+    {ok, State, {continue, ?ensure_subscription}}.
 
-handle_continue(ensure_subscription, State0) ->
+handle_continue(?ensure_subscription, State0) ->
     case ensure_subscription_exists(State0) of
-        ok ->
+        already_exists ->
+            {noreply, State0, {continue, ?patch_subscription}};
+        continue ->
             #{instance_id := InstanceId} = State0,
+            ?MODULE:pull_async(self()),
+            optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
             ?tp(
                 debug,
                 "gcp_pubsub_consumer_worker_subscription_ready",
                 #{instance_id => InstanceId}
             ),
+            {noreply, State0};
+        retry ->
+            {noreply, State0, {continue, ?ensure_subscription}};
+        not_found ->
+            %% there's nothing much to do if the topic suddenly doesn't exist anymore.
+            {stop, {error, topic_not_found}, State0}
+    end;
+handle_continue(?patch_subscription, State0) ->
+    ?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}),
+    case patch_subscription(State0) of
+        ok ->
+            #{instance_id := InstanceId} = State0,
             ?MODULE:pull_async(self()),
             optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
+            ?tp(
+                debug,
+                "gcp_pubsub_consumer_worker_subscription_ready",
+                #{instance_id => InstanceId}
+            ),
             {noreply, State0};
         error ->
-            %% FIXME: add delay if topic does not exist?!
             %% retry
-            {noreply, State0, {continue, ensure_subscription}}
+            {noreply, State0, {continue, ?patch_subscription}}
     end.
 
 handle_call(get_subscription, _From, State0) ->
@@ -201,21 +247,20 @@ handle_cast(pull_async, State0) ->
     State = do_pull_async(State0),
     {noreply, State};
 handle_cast({process_pull_response, RespBody}, State0) ->
+    ?tp(gcp_pubsub_consumer_worker_pull_response_received, #{}),
     State = do_process_pull_response(State0, RespBody),
     {noreply, State};
 handle_cast(ensure_subscription, State0) ->
-    {noreply, State0, {continue, ensure_subscription}};
+    {noreply, State0, {continue, ?ensure_subscription}};
 handle_cast(_Request, State0) ->
     {noreply, State0}.
 
 handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) ->
-    State1 = acknowledge(State0),
-    State = ensure_ack_timer(State1),
+    State = acknowledge(State0),
     {noreply, State};
 handle_info({timeout, TRef, pull}, State0 = #{pull_timer := TRef}) ->
     State1 = State0#{pull_timer := undefined},
-    State2 = do_pull_async(State1),
-    State = ensure_pull_timer(State2),
+    State = do_pull_async(State1),
     {noreply, State};
 handle_info(
     {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0}
@@ -225,6 +270,13 @@ handle_info(
     Workers = maps:remove(AsyncWorkerPid, Workers0),
     State1 = State0#{async_workers := Workers},
     State = do_pull_async(State1),
+    ?tp(gcp_pubsub_consumer_worker_handled_async_worker_down, #{async_worker_pid => AsyncWorkerPid}),
+    {noreply, State};
+handle_info({forget_message_ids, MsgIds}, State0) ->
+    State = maps:update_with(
+        seen_message_ids, fun(Seen) -> sets:subtract(Seen, MsgIds) end, State0
+    ),
+    ?tp(gcp_pubsub_consumer_worker_message_ids_forgotten, #{message_ids => MsgIds}),
     {noreply, State};
 handle_info(Msg, State0) ->
     #{
@@ -239,8 +291,18 @@ handle_info(Msg, State0) ->
     }),
     {noreply, State0}.
 
+terminate({error, topic_not_found} = _Reason, State) ->
+    #{
+        instance_id := InstanceId,
+        topic := _Topic
+    } = State,
+    optvar:unset(?OPTVAR_SUB_OK(self())),
+    emqx_bridge_gcp_pubsub_impl_consumer:mark_topic_as_nonexistent(InstanceId),
+    ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => _Topic}),
+    ok;
 terminate(_Reason, _State) ->
     optvar:unset(?OPTVAR_SUB_OK(self())),
+    ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => maps:get(topic, _State)}),
     ok.
 
 %%-------------------------------------------------------------------------------------------------
@@ -252,21 +314,24 @@ start_link(Config) ->
     gen_server:start_link(?MODULE, Config, []).
 
 -spec ensure_ack_timer(state()) -> state().
-ensure_ack_timer(State = #{pending_acks := []}) ->
-    State;
-ensure_ack_timer(State = #{ack_timer := TRef}) when is_reference(TRef) ->
-    State;
-ensure_ack_timer(State = #{ack_retry_interval := AckRetryInterval}) ->
-    State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}.
+ensure_ack_timer(State = #{ack_timer := TRef, pending_acks := PendingAcks}) ->
+    case {map_size(PendingAcks) =:= 0, is_reference(TRef)} of
+        {false, false} ->
+            #{ack_retry_interval := AckRetryInterval} = State,
+            State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)};
+        {_, _} ->
+            State
+    end.
 
 -spec ensure_pull_timer(state()) -> state().
 ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) ->
     State;
-ensure_pull_timer(State) ->
-    State#{pull_timer := emqx_utils:start_timer(?PULL_INTERVAL, pull)}.
+ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) ->
+    State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}.
 
--spec ensure_subscription_exists(state()) -> ok | error.
+-spec ensure_subscription_exists(state()) -> continue | retry | not_found | already_exists.
 ensure_subscription_exists(State) ->
+    ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}),
     #{
         client := Client,
         instance_id := InstanceId,
@@ -281,60 +346,122 @@ ensure_subscription_exists(State) ->
     case Res of
         {error, #{status_code := 409}} ->
             %% already exists
-            ?SLOG(debug, #{
-                msg => "gcp_pubsub_consumer_worker_subscription_already_exists",
-                instance_id => InstanceId,
-                topic => Topic,
-                subscription_id => SubscriptionId
-            }),
-            Method1 = patch,
-            Path1 = path(State, create),
-            Body1 = body(State, patch_subscription),
-            PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}},
-            Res1 = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client),
-            ?SLOG(debug, #{
-                msg => "gcp_pubsub_consumer_worker_subscription_patch",
-                instance_id => InstanceId,
-                topic => Topic,
-                subscription_id => SubscriptionId,
-                result => Res1
-            }),
-            ok;
+            ?tp(
+                debug,
+                "gcp_pubsub_consumer_worker_subscription_already_exists",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic,
+                    subscription_id => SubscriptionId
+                }
+            ),
+            already_exists;
+        {error, #{status_code := 404}} ->
+            %% nonexistent topic
+            ?tp(
+                warning,
+                "gcp_pubsub_consumer_worker_nonexistent_topic",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic
+                }
+            ),
+            not_found;
         {ok, #{status_code := 200}} ->
-            ?SLOG(debug, #{
-                msg => "gcp_pubsub_consumer_worker_subscription_created",
-                instance_id => InstanceId,
-                topic => Topic,
-                subscription_id => SubscriptionId
-            }),
-            ok;
+            ?tp(
+                debug,
+                "gcp_pubsub_consumer_worker_subscription_created",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic,
+                    subscription_id => SubscriptionId
+                }
+            ),
+            continue;
         {error, Reason} ->
-            ?SLOG(error, #{
-                msg => "gcp_pubsub_consumer_worker_subscription_error",
-                instance_id => InstanceId,
-                topic => Topic,
-                reason => Reason
-            }),
-            error
+            ?tp(
+                error,
+                "gcp_pubsub_consumer_worker_subscription_error",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic,
+                    reason => Reason
+                }
+            ),
+            retry
     end.
 
-%% We use async requests so that this process will be more responsive to system messages.
-do_pull_async(State) ->
+-spec patch_subscription(state()) -> ok | error.
+patch_subscription(State) ->
     #{
         client := Client,
-        instance_id := InstanceId
+        instance_id := InstanceId,
+        subscription_id := SubscriptionId,
+        topic := Topic
     } = State,
-    Method = post,
-    Path = path(State, pull),
-    Body = body(State, pull),
-    PreparedRequest = {prepared_request, {Method, Path, Body}},
-    ReplyFunAndArgs = {fun ?MODULE:reply_delegator/3, [self(), InstanceId]},
-    {ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_client:query_async(
-        PreparedRequest,
-        ReplyFunAndArgs,
-        Client
-    ),
-    ensure_async_worker_monitored(State, AsyncWorkerPid).
+    Method1 = patch,
+    Path1 = path(State, create),
+    Body1 = body(State, patch_subscription),
+    PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}},
+    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client),
+    case Res of
+        {ok, _} ->
+            ?tp(
+                debug,
+                "gcp_pubsub_consumer_worker_subscription_patched",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic,
+                    subscription_id => SubscriptionId,
+                    result => Res
+                }
+            ),
+            ok;
+        {error, Reason} ->
+            ?tp(
+                warning,
+                "gcp_pubsub_consumer_worker_subscription_patch_error",
+                #{
+                    instance_id => InstanceId,
+                    topic => Topic,
+                    subscription_id => SubscriptionId,
+                    reason => Reason
+                }
+            ),
+            error
+    end.
+
+%% We use async requests so that this process will be more responsive to system messages.
+-spec do_pull_async(state()) -> state().
+do_pull_async(State0) ->
+    ?tp_span(
+        gcp_pubsub_consumer_worker_pull_async,
+        #{topic => maps:get(topic, State0), subscription_id => maps:get(subscription_id, State0)},
+        begin
+            #{
+                client := Client,
+                instance_id := InstanceId
+            } = State0,
+            Method = post,
+            Path = path(State0, pull),
+            Body = body(State0, pull),
+            PreparedRequest = {prepared_request, {Method, Path, Body}},
+            ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]},
+            %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers...
+            Res = emqx_bridge_gcp_pubsub_client:query_async(
+                PreparedRequest,
+                ReplyFunAndArgs,
+                Client
+            ),
+            case Res of
+                {ok, AsyncWorkerPid} ->
+                    State1 = ensure_pull_timer(State0),
+                    ensure_async_worker_monitored(State1, AsyncWorkerPid);
+                {error, no_pool_worker_available} ->
+                    ensure_pull_timer(State0)
+            end
+        end
+    ).
 
 -spec ensure_async_worker_monitored(state(), pid()) -> state().
 ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerPid) ->
@@ -349,22 +476,58 @@ ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerP
 
 -spec do_process_pull_response(state(), binary()) -> state().
 do_process_pull_response(State0, RespBody) ->
+    #{
+        pending_acks := PendingAcks,
+        seen_message_ids := SeenMsgIds
+    } = State0,
     Messages = decode_response(RespBody),
-    AckIds = lists:map(fun(Msg) -> handle_message(State0, Msg) end, Messages),
-    State1 = maps:update_with(pending_acks, fun(AckIds0) -> AckIds0 ++ AckIds end, State0),
+    ?tp(gcp_pubsub_consumer_worker_decoded_messages, #{messages => Messages}),
+    {NewPendingAcks, NewSeenMsgIds} =
+        lists:foldl(
+            fun(
+                Msg = #{
+                    <<"ackId">> := AckId,
+                    <<"message">> := #{<<"messageId">> := MsgId}
+                },
+                {AccAck, AccSeen}
+            ) ->
+                case is_map_key(MsgId, PendingAcks) or sets:is_element(MsgId, SeenMsgIds) of
+                    true ->
+                        ?tp(message_redelivered, #{message => Msg}),
+                        %% even though it was redelivered, pubsub might change the ack
+                        %% id...  we should ack this latest value.
+                        {AccAck#{MsgId => AckId}, AccSeen};
+                    false ->
+                        _ = handle_message(State0, Msg),
+                        {AccAck#{MsgId => AckId}, sets:add_element(MsgId, AccSeen)}
+                end
+            end,
+            {PendingAcks, SeenMsgIds},
+            Messages
+        ),
+    State1 = State0#{pending_acks := NewPendingAcks, seen_message_ids := NewSeenMsgIds},
     State2 = acknowledge(State1),
     pull_async(self()),
-    ensure_ack_timer(State2).
+    State2.
 
 -spec acknowledge(state()) -> state().
-acknowledge(State0 = #{pending_acks := []}) ->
-    State0;
-acknowledge(State0) ->
+acknowledge(State0 = #{pending_acks := PendingAcks}) ->
+    case map_size(PendingAcks) =:= 0 of
+        true ->
+            State0;
+        false ->
+            do_acknowledge(State0)
+    end.
+
+do_acknowledge(State0) ->
+    ?tp(gcp_pubsub_consumer_worker_acknowledge_enter, #{}),
     State1 = State0#{ack_timer := undefined},
     #{
         client := Client,
-        pending_acks := AckIds
+        forget_interval := ForgetInterval,
+        pending_acks := PendingAcks
     } = State1,
+    AckIds = maps:values(PendingAcks),
     Method = post,
     Path = path(State1, ack),
     Body = body(State1, ack, #{ack_ids => AckIds}),
@@ -372,16 +535,27 @@ acknowledge(State0) ->
     Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
     case Res of
         {error, Reason} ->
-            ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", reason => Reason}),
-            State1;
+            ?tp(
+                warning,
+                "gcp_pubsub_consumer_worker_ack_error",
+                #{reason => Reason}
+            ),
+            ensure_ack_timer(State1);
         {ok, #{status_code := 200}} ->
-            ?tp(gcp_pubsub_consumer_worker_acknowledged, #{ack_ids => AckIds}),
-            State1#{pending_acks := []};
+            ?tp(gcp_pubsub_consumer_worker_acknowledged, #{acks => PendingAcks}),
+            MsgIds = maps:keys(PendingAcks),
+            forget_message_ids_after(MsgIds, ForgetInterval),
+            State1#{pending_acks := #{}};
         {ok, Details} ->
-            ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", details => Details}),
-            State1
+            ?tp(
+                warning,
+                "gcp_pubsub_consumer_worker_ack_error",
+                #{details => Details}
+            ),
+            ensure_ack_timer(State1)
     end.
 
+-spec do_get_subscription(state()) -> {ok, emqx_utils_json:json_term()} | {error, term()}.
 do_get_subscription(State) ->
     #{
         client := Client
@@ -442,12 +616,11 @@ body(State, pull) ->
     emqx_utils_json:encode(#{<<"maxMessages">> => PullMaxMessages});
 body(State, create) ->
     #{
-        ack_retry_interval := AckRetryInterval,
+        ack_deadline := AckDeadlineSeconds,
         project_id := ProjectId,
         topic := PubSubTopic
     } = State,
     TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
-    AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
     JSON = #{
         <<"topic">> => TopicResource,
         <<"ackDeadlineSeconds">> => AckDeadlineSeconds
@@ -455,14 +628,13 @@ body(State, create) ->
     emqx_utils_json:encode(JSON);
 body(State, patch_subscription) ->
     #{
-        ack_retry_interval := AckRetryInterval,
+        ack_deadline := AckDeadlineSeconds,
         project_id := ProjectId,
         topic := PubSubTopic,
         subscription_id := SubscriptionId
     } = State,
     TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
     SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
-    AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second),
     JSON = #{
         <<"subscription">> =>
             #{
@@ -505,50 +677,52 @@ decode_response(RespBody) ->
             []
     end.
 
--spec handle_message(state(), decoded_message()) -> [ack_id()].
+-spec handle_message(state(), decoded_message()) -> ok.
 handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Message) ->
-    ?tp(
+    ?tp_span(
         debug,
         "gcp_pubsub_consumer_worker_handle_message",
-        #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId}
-    ),
-    #{
-        instance_id := InstanceId,
-        hookpoint := Hookpoint,
-        mqtt_config := #{
-            payload_template := PayloadTemplate,
-            qos := MQTTQoS,
-            mqtt_topic := MQTTTopic
-        },
-        topic := Topic
-    } = State,
-    #{
-        <<"messageId">> := MessageId,
-        <<"publishTime">> := PublishTime
-    } = InnerMsg,
-    FullMessage0 = #{
-        message_id => MessageId,
-        publish_time => PublishTime,
-        topic => Topic
-    },
-    FullMessage =
-        lists:foldl(
-            fun({FromKey, ToKey}, Acc) ->
-                add_if_present(FromKey, InnerMsg, ToKey, Acc)
-            end,
-            FullMessage0,
-            [
-                {<<"data">>, value},
-                {<<"attributes">>, attributes},
-                {<<"orderingKey">>, ordering_key}
-            ]
-        ),
-    Payload = render(FullMessage, PayloadTemplate),
-    MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload),
-    _ = emqx:publish(MQTTMessage),
-    emqx:run_hook(Hookpoint, [FullMessage]),
-    emqx_resource_metrics:received_inc(InstanceId),
-    AckId.
+        #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId},
+        begin
+            #{
+                instance_id := InstanceId,
+                hookpoint := Hookpoint,
+                mqtt_config := #{
+                    payload_template := PayloadTemplate,
+                    qos := MQTTQoS,
+                    mqtt_topic := MQTTTopic
+                },
+                topic := Topic
+            } = State,
+            #{
+                <<"messageId">> := MessageId,
+                <<"publishTime">> := PublishTime
+            } = InnerMsg,
+            FullMessage0 = #{
+                message_id => MessageId,
+                publish_time => PublishTime,
+                topic => Topic
+            },
+            FullMessage =
+                lists:foldl(
+                    fun({FromKey, ToKey}, Acc) ->
+                        add_if_present(FromKey, InnerMsg, ToKey, Acc)
+                    end,
+                    FullMessage0,
+                    [
+                        {<<"data">>, value},
+                        {<<"attributes">>, attributes},
+                        {<<"orderingKey">>, ordering_key}
+                    ]
+                ),
+            Payload = render(FullMessage, PayloadTemplate),
+            MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload),
+            _ = emqx:publish(MQTTMessage),
+            emqx:run_hook(Hookpoint, [FullMessage]),
+            emqx_resource_metrics:received_inc(InstanceId),
+            ok
+        end
+    ).
 
 -spec add_if_present(any(), map(), any(), map()) -> map().
 add_if_present(FromKey, Message, ToKey, Map) ->
@@ -563,6 +737,11 @@ render(FullMessage, PayloadTemplate) ->
     Opts = #{return => full_binary},
     emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
 
+forget_message_ids_after(MsgIds0, Timeout) ->
+    MsgIds = sets:from_list(MsgIds0, [{version, 2}]),
+    _ = erlang:send_after(Timeout, self(), {forget_message_ids, MsgIds}),
+    ok.
+
 to_bin(A) when is_atom(A) -> atom_to_binary(A);
 to_bin(L) when is_list(L) -> iolist_to_binary(L);
 to_bin(B) when is_binary(B) -> B.

+ 98 - 23
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl

@@ -15,6 +15,13 @@
     on_get_status/2
 ]).
 
+%% health check API
+-export([
+    mark_topic_as_nonexistent/1,
+    unset_nonexistent_topic/1,
+    is_nonexistent_topic/1
+]).
+
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
@@ -39,6 +46,12 @@
 -export_type([mqtt_config/0]).
 
 -define(AUTO_RECONNECT_S, 2).
+-define(DEFAULT_FORGET_INTERVAL, timer:seconds(60)).
+-define(OPTVAR_TOPIC_NOT_FOUND(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}).
+-define(TOPIC_MESSAGE,
+    "GCP PubSub topics are invalid.  Please check the logs, check if the "
+    "topics exist in GCP and if the service account has permissions to use them."
+).
 
 %%-------------------------------------------------------------------------------------------------
 %% `emqx_resource' API
@@ -61,21 +74,45 @@ on_start(InstanceId, Config) ->
 
 -spec on_stop(resource_id(), state()) -> ok | {error, term()}.
 on_stop(InstanceId, _State) ->
+    ?tp(gcp_pubsub_consumer_stop_enter, #{}),
+    unset_nonexistent_topic(InstanceId),
     ok = stop_consumers(InstanceId),
     emqx_bridge_gcp_pubsub_client:stop(InstanceId).
 
--spec on_get_status(resource_id(), state()) -> connected | disconnected.
-on_get_status(InstanceId, _State) ->
-    %% Note: do *not* alter the `client' value here.  It must be immutable, since
-    %% we have handed it over to the pull workers.
-    case
-        emqx_resource_pool:health_check_workers(
-            InstanceId,
-            fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1
-        )
-    of
-        true -> connected;
-        false -> connecting
+-spec on_get_status(resource_id(), state()) -> connected | connecting | {disconnected, state(), _}.
+on_get_status(InstanceId, State) ->
+    %% We need to check this flag separately because the workers might be gone when we
+    %% check them.
+    case is_nonexistent_topic(InstanceId) of
+        true ->
+            {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
+        false ->
+            #{client := Client} = State,
+            check_workers(InstanceId, Client)
+    end.
+
+%%-------------------------------------------------------------------------------------------------
+%% Health check API (signalled by consumer worker)
+%%-------------------------------------------------------------------------------------------------
+
+-spec mark_topic_as_nonexistent(resource_id()) -> ok.
+mark_topic_as_nonexistent(InstanceId) ->
+    optvar:set(?OPTVAR_TOPIC_NOT_FOUND(InstanceId), true),
+    ok.
+
+-spec unset_nonexistent_topic(resource_id()) -> ok.
+unset_nonexistent_topic(InstanceId) ->
+    optvar:unset(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)),
+    ?tp(gcp_pubsub_consumer_unset_nonexistent_topic, #{}),
+    ok.
+
+-spec is_nonexistent_topic(resource_id()) -> boolean().
+is_nonexistent_topic(InstanceId) ->
+    case optvar:peek(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)) of
+        {ok, true} ->
+            true;
+        _ ->
+            false
     end.
 
 %%-------------------------------------------------------------------------------------------------
@@ -87,6 +124,7 @@ start_consumers(InstanceId, Client, Config) ->
         bridge_name := BridgeName,
         consumer := ConsumerConfig0,
         hookpoint := Hookpoint,
+        resource_opts := #{request_ttl := RequestTTL},
         service_account_json := #{project_id := ProjectId}
     } = Config,
     ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
@@ -97,22 +135,27 @@ start_consumers(InstanceId, Client, Config) ->
         auto_reconnect => ?AUTO_RECONNECT_S,
         bridge_name => BridgeName,
         client => Client,
+        forget_interval => forget_interval(RequestTTL),
         hookpoint => Hookpoint,
         instance_id => InstanceId,
         pool_size => PoolSize,
-        project_id => ProjectId
+        project_id => ProjectId,
+        pull_retry_interval => RequestTTL
     },
     ConsumerOpts = maps:to_list(ConsumerConfig),
-    %% FIXME: mark as unhealthy if topics do not exist!
     case validate_pubsub_topics(TopicMapping, Client) of
         ok ->
             ok;
-        error ->
+        {error, not_found} ->
             _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId),
             throw(
-                "GCP PubSub topics are invalid.  Please check the logs, check if the "
-                "topic exists in GCP and if the service account has permissions to use them."
-            )
+                {unhealthy_target, ?TOPIC_MESSAGE}
+            );
+        {error, _} ->
+            %% connection might be down; we'll have to check topic existence during health
+            %% check, or the workers will kill themselves when they realized there's no
+            %% topic when upserting their subscription.
+            ok
     end,
     case
         emqx_resource_pool:start(InstanceId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts)
@@ -170,8 +213,8 @@ do_validate_pubsub_topics(Client, [Topic | Rest]) ->
     case check_for_topic_existence(Topic, Client) of
         ok ->
             do_validate_pubsub_topics(Client, Rest);
-        {error, _} ->
-            error
+        {error, _} = Err ->
+            Err
     end;
 do_validate_pubsub_topics(_Client, []) ->
     %% we already validate that the mapping is not empty in the config schema.
@@ -184,9 +227,38 @@ check_for_topic_existence(Topic, Client) ->
             ok;
         {error, #{status_code := 404}} ->
             {error, not_found};
-        {error, Details} ->
-            ?tp(warning, "gcp_pubsub_consumer_check_topic_error", Details),
-            {error, Details}
+        {error, Reason} ->
+            ?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}),
+            {error, Reason}
+    end.
+
+-spec get_client_status(emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting.
+get_client_status(Client) ->
+    case emqx_bridge_gcp_pubsub_client:get_status(Client) of
+        disconnected -> connecting;
+        connected -> connected
+    end.
+
+-spec check_workers(resource_id(), emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting.
+check_workers(InstanceId, Client) ->
+    case
+        emqx_resource_pool:health_check_workers(
+            InstanceId,
+            fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1,
+            emqx_resource_pool:health_check_timeout(),
+            #{return_values => true}
+        )
+    of
+        {ok, Values} ->
+            AllOk = lists:all(fun(S) -> S =:= subscription_ok end, Values),
+            case AllOk of
+                true ->
+                    get_client_status(Client);
+                false ->
+                    connecting
+            end;
+        {error, _} ->
+            connecting
     end.
 
 log_when_error(Fun, Log) ->
@@ -199,3 +271,6 @@ log_when_error(Fun, Log) ->
                 reason => E
             })
     end.
+
+forget_interval(infinity) -> ?DEFAULT_FORGET_INTERVAL;
+forget_interval(Timeout) -> 2 * Timeout.

+ 3 - 3
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -103,7 +103,7 @@ on_query(ResourceId, {send_message, Selected}, State) ->
     {send_message, map()},
     {ReplyFun :: function(), Args :: list()},
     state()
-) -> {ok, pid()}.
+) -> {ok, pid()} | {error, no_pool_worker_available}.
 on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) ->
     Requests = [{send_message, Selected}],
     ?TRACE(
@@ -134,7 +134,7 @@ on_batch_query(ResourceId, Requests, State) ->
     [{send_message, map()}],
     {ReplyFun :: function(), Args :: list()},
     state()
-) -> {ok, pid()}.
+) -> {ok, pid()} | {error, no_pool_worker_available}.
 on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) ->
     ?TRACE(
         "QUERY_ASYNC",
@@ -177,7 +177,7 @@ do_send_requests_sync(State, Requests, InstanceId) ->
     state(),
     [{send_message, map()}],
     {ReplyFun :: function(), Args :: list()}
-) -> {ok, pid()}.
+) -> {ok, pid()} | {error, no_pool_worker_available}.
 do_send_requests_async(State, Requests, ReplyFunAndArgs0) ->
     #{client := Client} = State,
     Payloads =

Разлика између датотеке није приказан због своје велике величине
+ 1406 - 52
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl


+ 4 - 0
apps/emqx_resource/src/emqx_resource_pool.erl

@@ -19,6 +19,7 @@
 -export([
     start/3,
     stop/1,
+    health_check_timeout/0,
     health_check_workers/2,
     health_check_workers/3,
     health_check_workers/4
@@ -66,6 +67,9 @@ stop(Name) ->
             error({stop_pool_failed, Name, Reason})
     end.
 
+health_check_timeout() ->
+    ?HEALTH_CHECK_TIMEOUT.
+
 health_check_workers(PoolName, CheckFunc) ->
     health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT, _Opts = #{}).