Просмотр исходного кода

Merge pull request #9938 from keynslug/feat/mqtt-bridge-async-errors

feat(mqtt-bridge): report recoverable errors of async queries
Andrew Mayorov 3 лет назад
Родитель
Сommit
fe450ca2d9

+ 20 - 1
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -45,6 +45,8 @@
     on_get_status/2
 ]).
 
+-export([on_async_result/2]).
+
 -behaviour(hocon_schema).
 
 -import(hoconsc, [mk/2]).
@@ -194,8 +196,9 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
             classify_error(Reason)
     end.
 
-on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
+on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) ->
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
+    Callback = {fun on_async_result/2, [CallbackIn]},
     case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
         ok ->
             ok;
@@ -205,6 +208,20 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
             classify_error(Reason)
     end.
 
+on_async_result(Callback, ok) ->
+    apply_callback_function(Callback, ok);
+on_async_result(Callback, {ok, _} = Ok) ->
+    apply_callback_function(Callback, Ok);
+on_async_result(Callback, {error, Reason}) ->
+    apply_callback_function(Callback, classify_error(Reason)).
+
+apply_callback_function(F, Result) when is_function(F) ->
+    erlang:apply(F, [Result]);
+apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
+    erlang:apply(F, A ++ [Result]);
+apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
+    erlang:apply(M, F, A ++ [Result]).
+
 on_get_status(_InstId, #{name := InstanceId}) ->
     emqx_connector_mqtt_worker:status(InstanceId).
 
@@ -214,6 +231,8 @@ classify_error({disconnected, _RC, _} = Reason) ->
     {error, {recoverable_error, Reason}};
 classify_error({shutdown, _} = Reason) ->
     {error, {recoverable_error, Reason}};
+classify_error(shutdown = Reason) ->
+    {error, {recoverable_error, Reason}};
 classify_error(Reason) ->
     {error, {unrecoverable_error, Reason}}.
 

+ 1 - 0
changes/v5.0.17/fix-9938.en.md

@@ -0,0 +1 @@
+Report some egress MQTT bridge errors as recoverable, and thus retryable.

+ 1 - 0
changes/v5.0.17/fix-9938.zh.md

@@ -0,0 +1 @@
+将一些出口 MQTT 网桥错误报告为可恢复,因此可重试。