|
|
@@ -385,7 +385,6 @@ start_consumer(TestCase, Config) ->
|
|
|
<<Scheme/binary, (list_to_binary(PulsarHost))/binary, ":",
|
|
|
(integer_to_binary(PulsarPort))/binary>>
|
|
|
),
|
|
|
- ConnOpts = #{},
|
|
|
ConsumerClientId = list_to_atom(
|
|
|
atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer())
|
|
|
),
|
|
|
@@ -396,15 +395,9 @@ start_consumer(TestCase, Config) ->
|
|
|
certfile => filename:join([CertsPath, "cert.pem"]),
|
|
|
cacertfile => filename:join([CertsPath, "cacert.pem"])
|
|
|
},
|
|
|
- {ok, _ClientPid} = pulsar:ensure_supervised_client(
|
|
|
- ConsumerClientId,
|
|
|
- [URL],
|
|
|
- #{
|
|
|
- conn_opts => ConnOpts,
|
|
|
- ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)
|
|
|
- }
|
|
|
- ),
|
|
|
- ConsumerOpts = #{
|
|
|
+ Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)},
|
|
|
+ {ok, _ClientPid} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts),
|
|
|
+ ConsumerOpts = Opts#{
|
|
|
cb_init_args => #{send_to => self()},
|
|
|
cb_module => pulsar_echo_consumer,
|
|
|
sub_type => 'Shared',
|
|
|
@@ -414,8 +407,7 @@ start_consumer(TestCase, Config) ->
|
|
|
%% id, or else weird bugs will happen, like the
|
|
|
%% consumer never starts...
|
|
|
name => list_to_atom("test_consumer" ++ integer_to_list(erlang:unique_integer())),
|
|
|
- consumer_id => 1,
|
|
|
- conn_opts => ConnOpts
|
|
|
+ consumer_id => 1
|
|
|
},
|
|
|
{ok, Consumer} = pulsar:ensure_supervised_consumers(
|
|
|
ConsumerClientId,
|