Просмотр исходного кода

chore: bump `ecpool` to 0.5.4

With fixed typings and empty pool handling.
Andrew Mayorov 2 лет назад
Родитель
Сommit
eed9358abd
3 измененных файлов с 8 добавлено и 14 удалено
  1. 6 12
      apps/emqx_connector/src/emqx_connector_mqtt.erl
  2. 1 1
      mix.exs
  3. 1 1
      rebar.config

+ 6 - 12
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -162,7 +162,7 @@ on_query(
     #{egress_pool_name := PoolName, egress_config := Config}
 ) ->
     ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
-    handle_send_result(with_worker(PoolName, send, [Msg, Config]));
+    handle_send_result(with_egress_client(PoolName, send, [Msg, Config]));
 on_query(ResourceId, {send_message, Msg}, #{}) ->
     ?SLOG(error, #{
         msg => "forwarding_unavailable",
@@ -179,7 +179,7 @@ on_query_async(
 ) ->
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
     Callback = {fun on_async_result/2, [CallbackIn]},
-    Result = with_worker(PoolName, send_async, [Msg, Callback, Config]),
+    Result = with_egress_client(PoolName, send_async, [Msg, Callback, Config]),
     case Result of
         ok ->
             ok;
@@ -196,16 +196,8 @@ on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) ->
         reason => "Egress is not configured"
     }).
 
-with_worker(ResourceId, Fun, Args) ->
-    Worker = ecpool:get_client(ResourceId),
-    case is_pid(Worker) andalso ecpool_worker:client(Worker) of
-        {ok, Client} ->
-            erlang:apply(emqx_connector_mqtt_egress, Fun, [Client | Args]);
-        {error, Reason} ->
-            {error, Reason};
-        false ->
-            {error, disconnected}
-    end.
+with_egress_client(ResourceId, Fun, Args) ->
+    ecpool:pick_and_do(ResourceId, {emqx_connector_mqtt_egress, Fun, Args}, no_handover).
 
 on_async_result(Callback, Result) ->
     apply_callback_function(Callback, handle_send_result(Result)).
@@ -233,6 +225,8 @@ classify_reply(Reply = #{reason_code := _}) ->
 
 classify_error(disconnected = Reason) ->
     {recoverable_error, Reason};
+classify_error(ecpool_empty) ->
+    {recoverable_error, disconnected};
 classify_error({disconnected, _RC, _} = Reason) ->
     {recoverable_error, Reason};
 classify_error({shutdown, _} = Reason) ->

+ 1 - 1
mix.exs

@@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true},
-      {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true},
+      {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
       {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       # maybe forbid to fetch quicer

+ 1 - 1
rebar.config

@@ -66,7 +66,7 @@
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
     , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}}
-    , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}}
+    , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}}