|
@@ -48,6 +48,7 @@
|
|
|
memory_overload_protection := boolean()
|
|
memory_overload_protection := boolean()
|
|
|
},
|
|
},
|
|
|
compression := compression_mode(),
|
|
compression := compression_mode(),
|
|
|
|
|
+ connect_timeout := emqx_schema:duration_ms(),
|
|
|
max_batch_bytes := emqx_schema:bytesize(),
|
|
max_batch_bytes := emqx_schema:bytesize(),
|
|
|
message := message_template_raw(),
|
|
message := message_template_raw(),
|
|
|
pulsar_topic := binary(),
|
|
pulsar_topic := binary(),
|
|
@@ -80,8 +81,11 @@ on_start(InstanceId, Config) ->
|
|
|
} = Config,
|
|
} = Config,
|
|
|
Servers = format_servers(Servers0),
|
|
Servers = format_servers(Servers0),
|
|
|
ClientId = make_client_id(InstanceId, BridgeName),
|
|
ClientId = make_client_id(InstanceId, BridgeName),
|
|
|
|
|
+ ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId),
|
|
|
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
|
|
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
|
|
|
|
|
+ ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
|
|
|
ClientOpts = #{
|
|
ClientOpts = #{
|
|
|
|
|
+ connect_timeout => ConnectTimeout,
|
|
|
ssl_opts => SSLOpts,
|
|
ssl_opts => SSLOpts,
|
|
|
conn_opts => conn_opts(Config)
|
|
conn_opts => conn_opts(Config)
|
|
|
},
|
|
},
|
|
@@ -96,26 +100,46 @@ on_start(InstanceId, Config) ->
|
|
|
}
|
|
}
|
|
|
);
|
|
);
|
|
|
{error, Reason} ->
|
|
{error, Reason} ->
|
|
|
|
|
+ RedactedReason = emqx_utils:redact(Reason, fun is_sensitive_key/1),
|
|
|
?SLOG(error, #{
|
|
?SLOG(error, #{
|
|
|
msg => "failed_to_start_pulsar_client",
|
|
msg => "failed_to_start_pulsar_client",
|
|
|
instance_id => InstanceId,
|
|
instance_id => InstanceId,
|
|
|
pulsar_hosts => Servers,
|
|
pulsar_hosts => Servers,
|
|
|
- reason => emqx_utils:redact(Reason, fun is_sensitive_key/1)
|
|
|
|
|
|
|
+ reason => RedactedReason
|
|
|
}),
|
|
}),
|
|
|
- throw(failed_to_start_pulsar_client)
|
|
|
|
|
|
|
+ Message =
|
|
|
|
|
+ case get_error_message(RedactedReason) of
|
|
|
|
|
+ {ok, Msg} -> Msg;
|
|
|
|
|
+ error -> failed_to_start_pulsar_client
|
|
|
|
|
+ end,
|
|
|
|
|
+ throw(Message)
|
|
|
end,
|
|
end,
|
|
|
start_producer(Config, InstanceId, ClientId, ClientOpts).
|
|
start_producer(Config, InstanceId, ClientId, ClientOpts).
|
|
|
|
|
|
|
|
-spec on_stop(resource_id(), state()) -> ok.
|
|
-spec on_stop(resource_id(), state()) -> ok.
|
|
|
-on_stop(_InstanceId, State) ->
|
|
|
|
|
- #{
|
|
|
|
|
- pulsar_client_id := ClientId,
|
|
|
|
|
- producers := Producers
|
|
|
|
|
- } = State,
|
|
|
|
|
- stop_producers(ClientId, Producers),
|
|
|
|
|
- stop_client(ClientId),
|
|
|
|
|
- ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}),
|
|
|
|
|
- ok.
|
|
|
|
|
|
|
+on_stop(InstanceId, _State) ->
|
|
|
|
|
+ case emqx_resource:get_allocated_resources(InstanceId) of
|
|
|
|
|
+ #{pulsar_client_id := ClientId, pulsar_producers := Producers} ->
|
|
|
|
|
+ stop_producers(ClientId, Producers),
|
|
|
|
|
+ stop_client(ClientId),
|
|
|
|
|
+ ?tp(pulsar_bridge_stopped, #{
|
|
|
|
|
+ instance_id => InstanceId,
|
|
|
|
|
+ pulsar_client_id => ClientId,
|
|
|
|
|
+ pulsar_producers => Producers
|
|
|
|
|
+ }),
|
|
|
|
|
+ ok;
|
|
|
|
|
+ #{pulsar_client_id := ClientId} ->
|
|
|
|
|
+ stop_client(ClientId),
|
|
|
|
|
+ ?tp(pulsar_bridge_stopped, #{
|
|
|
|
|
+ instance_id => InstanceId,
|
|
|
|
|
+ pulsar_client_id => ClientId,
|
|
|
|
|
+ pulsar_producers => undefined
|
|
|
|
|
+ }),
|
|
|
|
|
+ ok;
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}),
|
|
|
|
|
+ ok
|
|
|
|
|
+ end.
|
|
|
|
|
|
|
|
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
|
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
|
|
on_get_status(_InstanceId, State = #{}) ->
|
|
on_get_status(_InstanceId, State = #{}) ->
|
|
@@ -316,6 +340,8 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
|
|
|
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
|
|
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
|
|
|
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
|
|
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
|
|
|
{ok, Producers} ->
|
|
{ok, Producers} ->
|
|
|
|
|
+ ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers),
|
|
|
|
|
+ ?tp(pulsar_producer_producers_allocated, #{}),
|
|
|
State = #{
|
|
State = #{
|
|
|
pulsar_client_id => ClientId,
|
|
pulsar_client_id => ClientId,
|
|
|
producers => Producers,
|
|
producers => Producers,
|
|
@@ -422,3 +448,19 @@ partition_strategy(Strategy) -> Strategy.
|
|
|
|
|
|
|
|
is_sensitive_key(auth_data) -> true;
|
|
is_sensitive_key(auth_data) -> true;
|
|
|
is_sensitive_key(_) -> false.
|
|
is_sensitive_key(_) -> false.
|
|
|
|
|
+
|
|
|
|
|
+get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) ->
|
|
|
|
|
+ Iter = maps:iterator(BrokerErrorMap),
|
|
|
|
|
+ do_get_error_message(Iter);
|
|
|
|
|
+get_error_message(_Error) ->
|
|
|
|
|
+ error.
|
|
|
|
|
+
|
|
|
|
|
+do_get_error_message(Iter) ->
|
|
|
|
|
+ case maps:next(Iter) of
|
|
|
|
|
+ {{_Broker, _Port}, #{message := Message}, _NIter} ->
|
|
|
|
|
+ {ok, Message};
|
|
|
|
|
+ {_K, _V, NIter} ->
|
|
|
|
|
+ do_get_error_message(NIter);
|
|
|
|
|
+ none ->
|
|
|
|
|
+ error
|
|
|
|
|
+ end.
|