|
@@ -20,16 +20,84 @@
|
|
|
all() ->
|
|
all() ->
|
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
|
|
|
|
|
|
+wait_until_kafka_is_up() ->
|
|
|
|
|
+ wait_until_kafka_is_up(0).
|
|
|
|
|
+
|
|
|
|
|
+wait_until_kafka_is_up(90) ->
|
|
|
|
|
+ ct:fail("Kafka is not up even though we have waited for a while");
|
|
|
|
|
+wait_until_kafka_is_up(Attempts) ->
|
|
|
|
|
+ KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
+ case resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0) of
|
|
|
|
|
+ {ok, _} ->
|
|
|
|
|
+ ok;
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ timer:sleep(1000),
|
|
|
|
|
+ wait_until_kafka_is_up(Attempts + 1)
|
|
|
|
|
+ end.
|
|
|
|
|
+
|
|
|
init_per_suite(Config) ->
|
|
init_per_suite(Config) ->
|
|
|
{ok, _} = application:ensure_all_started(brod),
|
|
{ok, _} = application:ensure_all_started(brod),
|
|
|
{ok, _} = application:ensure_all_started(wolff),
|
|
{ok, _} = application:ensure_all_started(wolff),
|
|
|
|
|
+ wait_until_kafka_is_up(),
|
|
|
Config.
|
|
Config.
|
|
|
|
|
|
|
|
end_per_suite(_) ->
|
|
end_per_suite(_) ->
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
-do_publish(Conf, KafkaTopic, InstId) ->
|
|
|
|
|
- Time = erlang:system_time(millisecond),
|
|
|
|
|
|
|
+t_publish_no_auth(_CtConfig) ->
|
|
|
|
|
+ publish_with_and_without_ssl("none").
|
|
|
|
|
+
|
|
|
|
|
+t_publish_sasl_plain(_CtConfig) ->
|
|
|
|
|
+ publish_with_and_without_ssl(valid_sasl_plain_settings()).
|
|
|
|
|
+
|
|
|
|
|
+t_publish_sasl_scram256(_CtConfig) ->
|
|
|
|
|
+ publish_with_and_without_ssl(valid_sasl_scram256_settings()).
|
|
|
|
|
+
|
|
|
|
|
+t_publish_sasl_scram512(_CtConfig) ->
|
|
|
|
|
+ publish_with_and_without_ssl(valid_sasl_scram512_settings()).
|
|
|
|
|
+
|
|
|
|
|
+t_publish_sasl_kerberos(_CtConfig) ->
|
|
|
|
|
+ publish_with_and_without_ssl(valid_sasl_kerberos_settings()).
|
|
|
|
|
+
|
|
|
|
|
+publish_with_and_without_ssl(AuthSettings) ->
|
|
|
|
|
+ publish_helper(#{
|
|
|
|
|
+ auth_settings => AuthSettings,
|
|
|
|
|
+ ssl_settings => #{}
|
|
|
|
|
+ }),
|
|
|
|
|
+ publish_helper(#{
|
|
|
|
|
+ auth_settings => AuthSettings,
|
|
|
|
|
+ ssl_settings => valid_ssl_settings()
|
|
|
|
|
+ }).
|
|
|
|
|
+
|
|
|
|
|
+publish_helper(#{
|
|
|
|
|
+ auth_settings := AuthSettings,
|
|
|
|
|
+ ssl_settings := SSLSettings
|
|
|
|
|
+}) ->
|
|
|
|
|
+ HostsString =
|
|
|
|
|
+ case {AuthSettings, SSLSettings} of
|
|
|
|
|
+ {"none", Map} when map_size(Map) =:= 0 ->
|
|
|
|
|
+ kafka_hosts_string();
|
|
|
|
|
+ {"none", Map} when map_size(Map) =/= 0 ->
|
|
|
|
|
+ kafka_hosts_string_ssl();
|
|
|
|
|
+ {_, Map} when map_size(Map) =:= 0 ->
|
|
|
|
|
+ kafka_hosts_string_sasl();
|
|
|
|
|
+ {_, _} ->
|
|
|
|
|
+ kafka_hosts_string_ssl_sasl()
|
|
|
|
|
+ end,
|
|
|
|
|
+ Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]),
|
|
|
|
|
+ Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
|
|
|
|
|
+ InstId = emqx_bridge_resource:resource_id("kafka", Name),
|
|
|
|
|
+ KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
+ Conf = config(#{
|
|
|
|
|
+ "authentication" => AuthSettings,
|
|
|
|
|
+ "kafka_hosts_string" => HostsString,
|
|
|
|
|
+ "kafka_topic" => KafkaTopic,
|
|
|
|
|
+ "instance_id" => InstId,
|
|
|
|
|
+ "ssl" => SSLSettings
|
|
|
|
|
+ }),
|
|
|
|
|
+ %% To make sure we get unique value
|
|
|
|
|
+ timer:sleep(1),
|
|
|
|
|
+ Time = erlang:monotonic_time(),
|
|
|
BinTime = integer_to_binary(Time),
|
|
BinTime = integer_to_binary(Time),
|
|
|
Msg = #{
|
|
Msg = #{
|
|
|
clientid => BinTime,
|
|
clientid => BinTime,
|
|
@@ -47,79 +115,10 @@ do_publish(Conf, KafkaTopic, InstId) ->
|
|
|
ok = ?PRODUCER:on_stop(InstId, State),
|
|
ok = ?PRODUCER:on_stop(InstId, State),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
-t_publish(_CtConfig) ->
|
|
|
|
|
- InstId = emqx_bridge_resource:resource_id("kafka", "NoAuthInst"),
|
|
|
|
|
- KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
- Conf = config(#{
|
|
|
|
|
- "authentication" => "none",
|
|
|
|
|
- "kafka_hosts_string" => kafka_hosts_string(),
|
|
|
|
|
- "kafka_topic" => KafkaTopic,
|
|
|
|
|
- "instance_id" => InstId
|
|
|
|
|
- }),
|
|
|
|
|
- do_publish(Conf, KafkaTopic, InstId).
|
|
|
|
|
-
|
|
|
|
|
-t_publish_sasl_plain(_CtConfig) ->
|
|
|
|
|
- InstId = emqx_bridge_resource:resource_id("kafka", "SASLPlainInst"),
|
|
|
|
|
- KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
- Conf = config(#{
|
|
|
|
|
- "authentication" => #{
|
|
|
|
|
- "mechanism" => "plain",
|
|
|
|
|
- "username" => "emqxuser",
|
|
|
|
|
- "password" => "password"
|
|
|
|
|
- },
|
|
|
|
|
- "kafka_hosts_string" => kafka_hosts_string_sasl(),
|
|
|
|
|
- "kafka_topic" => KafkaTopic,
|
|
|
|
|
- "instance_id" => InstId
|
|
|
|
|
- }),
|
|
|
|
|
- do_publish(Conf, KafkaTopic, InstId).
|
|
|
|
|
-
|
|
|
|
|
-t_publish_sasl_scram256(_CtConfig) ->
|
|
|
|
|
- InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram256Inst"),
|
|
|
|
|
- KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
- KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
- Conf = config(#{
|
|
|
|
|
- "authentication" => #{
|
|
|
|
|
- "mechanism" => "scram_sha_256",
|
|
|
|
|
- "username" => "emqxuser",
|
|
|
|
|
- "password" => "password"
|
|
|
|
|
- },
|
|
|
|
|
- "kafka_hosts_string" => kafka_hosts_string_sasl(),
|
|
|
|
|
- "kafka_topic" => KafkaTopic,
|
|
|
|
|
- "instance_id" => InstId
|
|
|
|
|
- }),
|
|
|
|
|
- do_publish(Conf, KafkaTopic, InstId).
|
|
|
|
|
-
|
|
|
|
|
-t_publish_sasl_scram512(_CtConfig) ->
|
|
|
|
|
- InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram512Inst"),
|
|
|
|
|
- KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
- Conf = config(#{
|
|
|
|
|
- "authentication" => #{
|
|
|
|
|
- "mechanism" => "scram_sha_512",
|
|
|
|
|
- "username" => "emqxuser",
|
|
|
|
|
- "password" => "password"
|
|
|
|
|
- },
|
|
|
|
|
- "kafka_hosts_string" => kafka_hosts_string_sasl(),
|
|
|
|
|
- "kafka_topic" => KafkaTopic,
|
|
|
|
|
- "instance_id" => InstId
|
|
|
|
|
- }),
|
|
|
|
|
- do_publish(Conf, KafkaTopic, InstId).
|
|
|
|
|
-
|
|
|
|
|
-t_publish_sasl_kerberos(_CtConfig) ->
|
|
|
|
|
- InstId = emqx_bridge_resource:resource_id("kafka", "SASLKerberosInst"),
|
|
|
|
|
- KafkaTopic = "test-topic-one-partition",
|
|
|
|
|
- Conf = config(#{
|
|
|
|
|
- "authentication" => #{
|
|
|
|
|
- "kerberos_principal" => "rig@KDC.EMQX.NET",
|
|
|
|
|
- "kerberos_keytab_file" => "/var/lib/secret/rig.key"
|
|
|
|
|
- },
|
|
|
|
|
- "kafka_hosts_string" => kafka_hosts_string_sasl(),
|
|
|
|
|
- "kafka_topic" => KafkaTopic,
|
|
|
|
|
- "instance_id" => InstId
|
|
|
|
|
- }),
|
|
|
|
|
- do_publish(Conf, KafkaTopic, InstId).
|
|
|
|
|
-
|
|
|
|
|
config(Args) ->
|
|
config(Args) ->
|
|
|
- {ok, Conf} = hocon:binary(hocon_config(Args)),
|
|
|
|
|
|
|
+ ConfText = hocon_config(Args),
|
|
|
|
|
+ ct:pal("Running tests with conf:\n~s", [ConfText]),
|
|
|
|
|
+ {ok, Conf} = hocon:binary(ConfText),
|
|
|
#{config := Parsed} = hocon_tconf:check_plain(
|
|
#{config := Parsed} = hocon_tconf:check_plain(
|
|
|
emqx_ee_bridge_kafka,
|
|
emqx_ee_bridge_kafka,
|
|
|
#{<<"config">> => Conf},
|
|
#{<<"config">> => Conf},
|
|
@@ -132,9 +131,15 @@ hocon_config(Args) ->
|
|
|
AuthConf = maps:get("authentication", Args),
|
|
AuthConf = maps:get("authentication", Args),
|
|
|
AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
|
|
AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
|
|
|
AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
|
|
AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
|
|
|
|
|
+ SSLConf = maps:get("ssl", Args, #{}),
|
|
|
|
|
+ SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
|
|
|
|
|
+ SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
|
|
|
Hocon = bbmustache:render(
|
|
Hocon = bbmustache:render(
|
|
|
iolist_to_binary(hocon_config_template()),
|
|
iolist_to_binary(hocon_config_template()),
|
|
|
- Args#{"authentication" => AuthConfRendered}
|
|
|
|
|
|
|
+ Args#{
|
|
|
|
|
+ "authentication" => AuthConfRendered,
|
|
|
|
|
+ "ssl" => SSLConfRendered
|
|
|
|
|
+ }
|
|
|
),
|
|
),
|
|
|
Hocon.
|
|
Hocon.
|
|
|
|
|
|
|
@@ -144,6 +149,7 @@ hocon_config_template() ->
|
|
|
bootstrap_hosts = \"{{ kafka_hosts_string }}\"
|
|
bootstrap_hosts = \"{{ kafka_hosts_string }}\"
|
|
|
enable = true
|
|
enable = true
|
|
|
authentication = {{{ authentication }}}
|
|
authentication = {{{ authentication }}}
|
|
|
|
|
+ssl = {{{ ssl }}}
|
|
|
producer = {
|
|
producer = {
|
|
|
mqtt {
|
|
mqtt {
|
|
|
topic = \"t/#\"
|
|
topic = \"t/#\"
|
|
@@ -173,12 +179,65 @@ hocon_config_template_authentication(#{"kerberos_principal" := _}) ->
|
|
|
}
|
|
}
|
|
|
""".
|
|
""".
|
|
|
|
|
|
|
|
|
|
+%% erlfmt-ignore
|
|
|
|
|
+hocon_config_template_ssl(Map) when map_size(Map) =:= 0 ->
|
|
|
|
|
+"""
|
|
|
|
|
+{
|
|
|
|
|
+ enable = false
|
|
|
|
|
+}
|
|
|
|
|
+""";
|
|
|
|
|
+hocon_config_template_ssl(_) ->
|
|
|
|
|
+"""
|
|
|
|
|
+{
|
|
|
|
|
+ enable = true
|
|
|
|
|
+ cacertfile = \"{{{cacertfile}}}\"
|
|
|
|
|
+ certfile = \"{{{certfile}}}\"
|
|
|
|
|
+ keyfile = \"{{{keyfile}}}\"
|
|
|
|
|
+}
|
|
|
|
|
+""".
|
|
|
|
|
+
|
|
|
kafka_hosts_string() ->
|
|
kafka_hosts_string() ->
|
|
|
"kafka-1.emqx.net:9092,".
|
|
"kafka-1.emqx.net:9092,".
|
|
|
|
|
|
|
|
kafka_hosts_string_sasl() ->
|
|
kafka_hosts_string_sasl() ->
|
|
|
"kafka-1.emqx.net:9093,".
|
|
"kafka-1.emqx.net:9093,".
|
|
|
|
|
|
|
|
|
|
+kafka_hosts_string_ssl() ->
|
|
|
|
|
+ "kafka-1.emqx.net:9094,".
|
|
|
|
|
+
|
|
|
|
|
+kafka_hosts_string_ssl_sasl() ->
|
|
|
|
|
+ "kafka-1.emqx.net:9095,".
|
|
|
|
|
+
|
|
|
|
|
+valid_ssl_settings() ->
|
|
|
|
|
+ #{
|
|
|
|
|
+ "cacertfile" => <<"/var/lib/secret/ca.crt">>,
|
|
|
|
|
+ "certfile" => <<"/var/lib/secret/client.crt">>,
|
|
|
|
|
+ "keyfile" => <<"/var/lib/secret/client.key">>
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
|
|
+valid_sasl_plain_settings() ->
|
|
|
|
|
+ #{
|
|
|
|
|
+ "mechanism" => "plain",
|
|
|
|
|
+ "username" => "emqxuser",
|
|
|
|
|
+ "password" => "password"
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
|
|
+valid_sasl_scram256_settings() ->
|
|
|
|
|
+ (valid_sasl_plain_settings())#{
|
|
|
|
|
+ "mechanism" => "scram_sha_256"
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
|
|
+valid_sasl_scram512_settings() ->
|
|
|
|
|
+ (valid_sasl_plain_settings())#{
|
|
|
|
|
+ "mechanism" => "scram_sha_512"
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
|
|
+valid_sasl_kerberos_settings() ->
|
|
|
|
|
+ #{
|
|
|
|
|
+ "kerberos_principal" => "rig@KDC.EMQX.NET",
|
|
|
|
|
+ "kerberos_keytab_file" => "/var/lib/secret/rig.keytab"
|
|
|
|
|
+ }.
|
|
|
|
|
+
|
|
|
kafka_hosts() ->
|
|
kafka_hosts() ->
|
|
|
kpro:parse_endpoints(kafka_hosts_string()).
|
|
kpro:parse_endpoints(kafka_hosts_string()).
|
|
|
|
|
|