Просмотр исходного кода

fix(pulsar): handle `Redirect` in `LookupTopicResponse`

See https://github.com/emqx/pulsar-client-erl/pull/68

Fixes https://emqx.atlassian.net/browse/EMQX-13572
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
26f59b0154

+ 1 - 1
apps/emqx_bridge_pulsar/mix.exs

@@ -25,7 +25,7 @@ defmodule EMQXBridgePulsar.MixProject do
     [
     [
       UMP.common_dep(:crc32cer),
       UMP.common_dep(:crc32cer),
       UMP.common_dep(:snappyer),
       UMP.common_dep(:snappyer),
-      {:pulsar, github: "emqx/pulsar-client-erl", tag: "1.0.0"},
+      {:pulsar, github: "emqx/pulsar-client-erl", tag: "2.0.0"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true},
       {:emqx_resource, in_umbrella: true},
       {:emqx_bridge, in_umbrella: true, runtime: false}
       {:emqx_bridge, in_umbrella: true, runtime: false}

+ 1 - 1
apps/emqx_bridge_pulsar/rebar.config

@@ -2,7 +2,7 @@
 
 
 {erl_opts, [debug_info]}.
 {erl_opts, [debug_info]}.
 {deps, [
 {deps, [
-    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "1.0.0"}}},
+    {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "2.0.0"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}},
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}
     {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 8 - 13
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -89,7 +89,7 @@ on_start(ConnResId, Config) ->
         conn_opts => conn_opts(Config)
         conn_opts => conn_opts(Config)
     },
     },
     case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of
     case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of
-        {ok, _Pid} ->
+        {ok, _} ->
             ?tp(
             ?tp(
                 info,
                 info,
                 "pulsar_client_started",
                 "pulsar_client_started",
@@ -189,18 +189,13 @@ on_stop(ConnResId, _State) ->
 -spec on_get_status(resource_id(), state()) -> connected | connecting.
 -spec on_get_status(resource_id(), state()) -> connected | connecting.
 on_get_status(_ConnResId, State = #{}) ->
 on_get_status(_ConnResId, State = #{}) ->
     #{client_id := ClientId} = State,
     #{client_id := ClientId} = State,
-    case pulsar_client_sup:find_client(ClientId) of
-        {ok, Pid} ->
-            try pulsar_client:get_status(Pid) of
-                true -> ?status_connected;
-                false -> ?status_connecting
-            catch
-                exit:{timeout, _} ->
-                    ?status_connecting;
-                exit:{noproc, _} ->
-                    ?status_connecting
-            end;
-        {error, _} ->
+    try pulsar_client_manager:get_status(ClientId, 5_000) of
+        true -> ?status_connected;
+        false -> ?status_connecting
+    catch
+        exit:{timeout, _} ->
+            ?status_connecting;
+        exit:{noproc, _} ->
             ?status_connecting
             ?status_connecting
     end;
     end;
 on_get_status(_ConnResId, _State) ->
 on_get_status(_ConnResId, _State) ->

+ 1 - 1
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl

@@ -400,7 +400,7 @@ start_consumer(TestCase, Config) ->
         cacertfile => filename:join([CertsPath, "cacert.pem"])
         cacertfile => filename:join([CertsPath, "cacert.pem"])
     },
     },
     Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)},
     Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)},
-    {ok, _ClientPid} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts),
+    {ok, _} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts),
     ConsumerOpts = Opts#{
     ConsumerOpts = Opts#{
         cb_init_args => #{send_to => self()},
         cb_init_args => #{send_to => self()},
         cb_module => pulsar_echo_consumer,
         cb_module => pulsar_echo_consumer,

+ 1 - 1
apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl

@@ -275,7 +275,7 @@ start_consumer(TestCase, Config) ->
         cacertfile => filename:join([CertsPath, "cacert.pem"])
         cacertfile => filename:join([CertsPath, "cacert.pem"])
     },
     },
     Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)},
     Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)},
-    {ok, _ClientPid} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts),
+    {ok, _} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts),
     ConsumerOpts = Opts#{
     ConsumerOpts = Opts#{
         cb_init_args => #{send_to => self()},
         cb_init_args => #{send_to => self()},
         cb_module => pulsar_echo_consumer,
         cb_module => pulsar_echo_consumer,