Просмотр исходного кода

refactor(gcp_pubsub): move logging from connector to bridge

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
bb49482529

+ 58 - 113
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl

@@ -286,98 +286,14 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI
     ),
     Headers = get_jwt_authorization_header(JWTConfig),
     Request = {Path, Headers, Body},
-    case
-        ehttpc:request(
-            PoolName,
-            Method,
-            Request,
-            RequestTTL,
-            MaxRetries
-        )
-    of
-        {error, Reason} when
-            Reason =:= econnrefused;
-            %% this comes directly from `gun'...
-            Reason =:= {closed, "The connection was lost."};
-            Reason =:= timeout
-        ->
-            ?tp(
-                warning,
-                gcp_pubsub_request_failed,
-                #{
-                    reason => Reason,
-                    query_mode => sync,
-                    recoverable_error => true,
-                    connector => ResourceId
-                }
-            ),
-            {error, {recoverable_error, Reason}};
-        {error, Reason} = Result ->
-            ?tp(
-                error,
-                gcp_pubsub_request_failed,
-                #{
-                    reason => Reason,
-                    query_mode => sync,
-                    recoverable_error => false,
-                    connector => ResourceId
-                }
-            ),
-            Result;
-        {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
-            ?tp(
-                gcp_pubsub_response,
-                #{
-                    response => Result,
-                    query_mode => sync,
-                    connector => ResourceId
-                }
-            ),
-            Result;
-        {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
-            ?tp(
-                gcp_pubsub_response,
-                #{
-                    response => Result,
-                    query_mode => sync,
-                    connector => ResourceId
-                }
-            ),
-            Result;
-        {ok, StatusCode, RespHeaders} = _Result ->
-            ?tp(
-                gcp_pubsub_response,
-                #{
-                    response => _Result,
-                    query_mode => sync,
-                    connector => ResourceId
-                }
-            ),
-            ?SLOG(error, #{
-                msg => "gcp_pubsub_error_response",
-                request => emqx_connector_http:redact_request(Request),
-                connector => ResourceId,
-                status_code => StatusCode
-            }),
-            {error, #{status_code => StatusCode, headers => RespHeaders}};
-        {ok, StatusCode, RespHeaders, RespBody} = _Result ->
-            ?tp(
-                gcp_pubsub_response,
-                #{
-                    response => _Result,
-                    query_mode => sync,
-                    connector => ResourceId
-                }
-            ),
-            ?SLOG(error, #{
-                msg => "gcp_pubsub_error_response",
-                request => emqx_connector_http:redact_request(Request),
-                connector => ResourceId,
-                status_code => StatusCode,
-                resp_body => RespBody
-            }),
-            {error, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}}
-    end.
+    Response = ehttpc:request(
+        PoolName,
+        Method,
+        Request,
+        RequestTTL,
+        MaxRetries
+    ),
+    handle_response(Response, ResourceId, _QueryMode = sync).
 
 -spec do_send_requests_async(
     state(),
@@ -412,42 +328,71 @@ do_send_requests_async(
     ),
     {ok, Worker}.
 
--spec reply_delegator(
-    resource_id(),
-    {ReplyFun :: function(), Args :: list()},
-    term() | {error, econnrefused | timeout | term()}
-) -> ok.
-reply_delegator(_ResourceId, ReplyFunAndArgs, Result) ->
+handle_response(Result, ResourceId, QueryMode) ->
     case Result of
-        {error, Reason} when
-            Reason =:= econnrefused;
-            %% this comes directly from `gun'...
-            Reason =:= {closed, "The connection was lost."};
-            Reason =:= timeout
-        ->
+        {error, Reason} ->
             ?tp(
                 gcp_pubsub_request_failed,
                 #{
                     reason => Reason,
-                    query_mode => async,
-                    recoverable_error => true,
-                    connector => _ResourceId
+                    query_mode => QueryMode,
+                    connector => ResourceId
+                }
+            ),
+            {error, Reason};
+        {ok, StatusCode, RespHeaders} when StatusCode >= 200 andalso StatusCode < 300 ->
+            ?tp(
+                gcp_pubsub_response,
+                #{
+                    response => Result,
+                    query_mode => QueryMode,
+                    connector => ResourceId
                 }
             ),
-            Result1 = {error, {recoverable_error, Reason}},
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
-        _ ->
+            {ok, #{status_code => StatusCode, headers => RespHeaders}};
+        {ok, StatusCode, RespHeaders, RespBody} when
+            StatusCode >= 200 andalso StatusCode < 300
+        ->
             ?tp(
                 gcp_pubsub_response,
                 #{
                     response => Result,
-                    query_mode => async,
-                    connector => _ResourceId
+                    query_mode => QueryMode,
+                    connector => ResourceId
                 }
             ),
-            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
+            {ok, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}};
+        {ok, StatusCode, RespHeaders} = _Result ->
+            ?tp(
+                gcp_pubsub_response,
+                #{
+                    response => _Result,
+                    query_mode => QueryMode,
+                    connector => ResourceId
+                }
+            ),
+            {error, #{status_code => StatusCode, headers => RespHeaders}};
+        {ok, StatusCode, RespHeaders, RespBody} = _Result ->
+            ?tp(
+                gcp_pubsub_response,
+                #{
+                    response => _Result,
+                    query_mode => QueryMode,
+                    connector => ResourceId
+                }
+            ),
+            {error, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}}
     end.
 
+-spec reply_delegator(
+    resource_id(),
+    {ReplyFun :: function(), Args :: list()},
+    term() | {error, econnrefused | timeout | term()}
+) -> ok.
+reply_delegator(ResourceId, ReplyFunAndArgs, Response) ->
+    Result = handle_response(Response, ResourceId, _QueryMode = async),
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
+
 -spec do_get_status(resource_id(), timer:time()) -> boolean().
 do_get_status(ResourceId, Timeout) ->
     Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ResourceId)],

+ 76 - 2
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -6,6 +6,7 @@
 
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -type config() :: #{
     connect_timeout := emqx_schema:duration_ms(),
@@ -37,6 +38,8 @@
     on_get_status/2
 ]).
 
+-export([reply_delegator/2]).
+
 %%-------------------------------------------------------------------------------------------------
 %% `emqx_resource' API
 %%-------------------------------------------------------------------------------------------------
@@ -163,7 +166,9 @@ do_send_requests_sync(State, Requests, InstanceId) ->
     Path = publish_path(State),
     Method = post,
     Request = {prepared_request, {Method, Path, Body}},
-    emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState).
+    Result = emqx_bridge_gcp_pubsub_connector:on_query(InstanceId, Request, ConnectorState),
+    QueryMode = sync,
+    handle_result(Result, Request, QueryMode, InstanceId).
 
 -spec do_send_requests_async(
     state(),
@@ -171,7 +176,7 @@ do_send_requests_sync(State, Requests, InstanceId) ->
     {ReplyFun :: function(), Args :: list()},
     resource_id()
 ) -> {ok, pid()}.
-do_send_requests_async(State, Requests, ReplyFunAndArgs, InstanceId) ->
+do_send_requests_async(State, Requests, ReplyFunAndArgs0, InstanceId) ->
     #{connector_state := ConnectorState} = State,
     Payloads =
         lists:map(
@@ -184,6 +189,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, InstanceId) ->
     Path = publish_path(State),
     Method = post,
     Request = {prepared_request, {Method, Path, Body}},
+    ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
     emqx_bridge_gcp_pubsub_connector:on_query_async(
         InstanceId, Request, ReplyFunAndArgs, ConnectorState
     ).
@@ -209,3 +215,71 @@ publish_path(
     }
 ) ->
     <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
+
+handle_result({error, Reason}, _Request, QueryMode, ResourceId) when
+    Reason =:= econnrefused;
+    %% this comes directly from `gun'...
+    Reason =:= {closed, "The connection was lost."};
+    Reason =:= timeout
+->
+    ?tp(
+        warning,
+        gcp_pubsub_request_failed,
+        #{
+            reason => Reason,
+            query_mode => QueryMode,
+            recoverable_error => true,
+            connector => ResourceId
+        }
+    ),
+    {error, {recoverable_error, Reason}};
+handle_result(
+    {error, #{status_code := StatusCode, body := RespBody}} = Result,
+    Request,
+    _QueryMode,
+    ResourceId
+) ->
+    ?SLOG(error, #{
+        msg => "gcp_pubsub_error_response",
+        request => emqx_connector_http:redact_request(Request),
+        connector => ResourceId,
+        status_code => StatusCode,
+        resp_body => RespBody
+    }),
+    Result;
+handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) ->
+    ?SLOG(error, #{
+        msg => "gcp_pubsub_error_response",
+        request => emqx_connector_http:redact_request(Request),
+        connector => ResourceId,
+        status_code => StatusCode
+    }),
+    Result;
+handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) ->
+    ?tp(
+        error,
+        gcp_pubsub_request_failed,
+        #{
+            reason => Reason,
+            query_mode => QueryMode,
+            recoverable_error => false,
+            connector => ResourceId
+        }
+    ),
+    Result;
+handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) ->
+    Result.
+
+reply_delegator(ReplyFunAndArgs, Response) ->
+    case Response of
+        {error, Reason} when
+            Reason =:= econnrefused;
+            %% this comes directly from `gun'...
+            Reason =:= {closed, "The connection was lost."};
+            Reason =:= timeout
+        ->
+            Result1 = {error, {recoverable_error, Reason}},
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
+        _ ->
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response)
+    end.

+ 5 - 5
apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_bridge_gcp_pubsub_SUITE).
+-module(emqx_bridge_gcp_pubsub_producer_SUITE).
 
 -compile(nowarn_export_all).
 -compile(export_all).
@@ -1071,7 +1071,7 @@ do_econnrefused_or_timeout_test(Config, Error) ->
                             #{
                                 ?snk_kind := gcp_pubsub_request_failed,
                                 query_mode := async,
-                                recoverable_error := true
+                                reason := econnrefused
                             },
                             15_000
                         );
@@ -1307,13 +1307,13 @@ t_unrecoverable_error(Config) ->
         {_, {ok, _}} =
             ?wait_async_action(
                 emqx:publish(Message),
-                #{?snk_kind := gcp_pubsub_response},
+                #{?snk_kind := gcp_pubsub_request_failed},
                 5_000
             ),
         fun(Trace) ->
             ?assertMatch(
-                [#{response := {error, killed}}],
-                ?of_kind(gcp_pubsub_response, Trace)
+                [#{reason := killed}],
+                ?of_kind(gcp_pubsub_request_failed, Trace)
             ),
             ok
         end