|
|
@@ -64,6 +64,8 @@
|
|
|
-define(pulsar_client_id, pulsar_client_id).
|
|
|
-define(pulsar_producers, pulsar_producers).
|
|
|
|
|
|
+-define(HEALTH_CHECK_RETRY_TIMEOUT, 4_000).
|
|
|
+
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
%% `emqx_resource' API
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
@@ -440,9 +442,18 @@ render(Message, Template) ->
|
|
|
emqx_placeholder:proc_tmpl(Template, Message, Opts).
|
|
|
|
|
|
get_producer_status(Producers) ->
|
|
|
+ do_get_producer_status(Producers, 0).
|
|
|
+
|
|
|
+do_get_producer_status(_Producers, TimeSpent) when TimeSpent > ?HEALTH_CHECK_RETRY_TIMEOUT ->
|
|
|
+ connecting;
|
|
|
+do_get_producer_status(Producers, TimeSpent) ->
|
|
|
case pulsar_producers:all_connected(Producers) of
|
|
|
- true -> connected;
|
|
|
- false -> connecting
|
|
|
+ true ->
|
|
|
+ connected;
|
|
|
+ false ->
|
|
|
+ Sleep = 200,
|
|
|
+ timer:sleep(Sleep),
|
|
|
+ do_get_producer_status(Producers, TimeSpent + Sleep)
|
|
|
end.
|
|
|
|
|
|
partition_strategy(key_dispatch) -> first_key_dispatch;
|