|
@@ -112,18 +112,19 @@ on_start(
|
|
|
sync_timeout => SyncTimeout,
|
|
sync_timeout => SyncTimeout,
|
|
|
templates => Templates,
|
|
templates => Templates,
|
|
|
producers_map_pid => ProducersMapPID,
|
|
producers_map_pid => ProducersMapPID,
|
|
|
- producers_opts => ProducerOpts
|
|
|
|
|
|
|
+ producers_opts => emqx_secret:wrap(ProducerOpts)
|
|
|
},
|
|
},
|
|
|
|
|
|
|
|
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
|
|
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
|
|
|
{ok, _Pid} ->
|
|
{ok, _Pid} ->
|
|
|
{ok, State};
|
|
{ok, State};
|
|
|
- {error, _Reason} = Error ->
|
|
|
|
|
|
|
+ {error, Reason0} ->
|
|
|
|
|
+ Reason = redact(Reason0),
|
|
|
?tp(
|
|
?tp(
|
|
|
rocketmq_connector_start_failed,
|
|
rocketmq_connector_start_failed,
|
|
|
- #{error => _Reason}
|
|
|
|
|
|
|
+ #{error => Reason}
|
|
|
),
|
|
),
|
|
|
- Error
|
|
|
|
|
|
|
+ {error, Reason}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
|
|
on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
|
|
@@ -220,7 +221,7 @@ safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, R
|
|
|
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
|
|
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
|
|
|
catch
|
|
catch
|
|
|
_Type:Reason ->
|
|
_Type:Reason ->
|
|
|
- {error, {unrecoverable_error, Reason}}
|
|
|
|
|
|
|
+ {error, {unrecoverable_error, redact(Reason)}}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
produce(_InstanceId, QueryFunc, Producers, Data, RequestTimeout) ->
|
|
produce(_InstanceId, QueryFunc, Producers, Data, RequestTimeout) ->
|
|
@@ -335,7 +336,7 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) ->
|
|
|
_ ->
|
|
_ ->
|
|
|
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]),
|
|
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]),
|
|
|
{ok, Producers0} = rocketmq:ensure_supervised_producers(
|
|
{ok, Producers0} = rocketmq:ensure_supervised_producers(
|
|
|
- ClientId, ProducerGroup, Topic1, ProducerOpts
|
|
|
|
|
|
|
+ ClientId, ProducerGroup, Topic1, emqx_secret:unwrap(ProducerOpts)
|
|
|
),
|
|
),
|
|
|
ets:insert(ClientId, {TopicKey, Producers0}),
|
|
ets:insert(ClientId, {TopicKey, Producers0}),
|
|
|
Producers0
|
|
Producers0
|