| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%--------------------------------------------------------------------
- -module(emqx_bridge_confluent_tests).
- -include_lib("eunit/include/eunit.hrl").
- %%===========================================================================
- %% Data Section
- %%===========================================================================
- %% erlfmt-ignore
- confluent_producer_action_hocon() ->
- "
- actions.confluent_producer.my_producer {
- enable = true
- connector = my_connector
- parameters {
- buffer {
- memory_overload_protection = false
- mode = memory
- per_partition_limit = 2GB
- segment_bytes = 100MB
- }
- compression = no_compression
- kafka_header_value_encode_mode = none
- max_batch_bytes = 896KB
- max_inflight = 10
- message {
- key = \"${.clientid}\"
- value = \"${.}\"
- }
- partition_count_refresh_interval = 60s
- partition_strategy = random
- query_mode = async
- required_acks = all_isr
- sync_query_timeout = 5s
- topic = test
- }
- local_topic = \"t/confluent\"
- }
- ".
- confluent_producer_connector_hocon() ->
- ""
- "\n"
- "connectors.confluent_producer.my_producer {\n"
- " enable = true\n"
- " authentication {\n"
- " username = \"user\"\n"
- " password = \"xxx\"\n"
- " }\n"
- " bootstrap_hosts = \"xyz.sa-east1.gcp.confluent.cloud:9092\"\n"
- " connect_timeout = 5s\n"
- " metadata_request_timeout = 5s\n"
- " min_metadata_refresh_interval = 3s\n"
- " socket_opts {\n"
- " recbuf = 1024KB\n"
- " sndbuf = 1024KB\n"
- " tcp_keepalive = none\n"
- " }\n"
- "}\n"
- "".
- %%===========================================================================
- %% Helper functions
- %%===========================================================================
- parse(Hocon) ->
- {ok, Conf} = hocon:binary(Hocon),
- Conf.
- check(SchemaMod, Conf) when is_map(Conf) ->
- hocon_tconf:check_plain(SchemaMod, Conf, #{required => false}).
- check_action(Conf) when is_map(Conf) ->
- check(emqx_bridge_v2_schema, Conf).
- check_connector(Conf) when is_map(Conf) ->
- check(emqx_connector_schema, Conf).
- -define(validation_error(SchemaMod, Reason, Value),
- {SchemaMod, [
- #{
- kind := validation_error,
- reason := Reason,
- value := Value
- }
- ]}
- ).
- -define(action_validation_error(Reason, Value),
- ?validation_error(emqx_bridge_v2_schema, Reason, Value)
- ).
- -define(connector_validation_error(Reason, Value),
- ?validation_error(emqx_connector_schema, Reason, Value)
- ).
- -define(ok_config(RootKey, Cfg), #{
- RootKey :=
- #{
- <<"confluent_producer">> :=
- #{
- <<"my_producer">> :=
- Cfg
- }
- }
- }).
- -define(ok_connector_config(Cfg), ?ok_config(<<"connectors">>, Cfg)).
- -define(ok_action_config(Cfg), ?ok_config(<<"actions">>, Cfg)).
- %%===========================================================================
- %% Test cases
- %%===========================================================================
- confluent_producer_connector_test_() ->
- %% ensure this module is loaded when testing only this file
- _ = emqx_bridge_enterprise:module_info(),
- BaseConf = parse(confluent_producer_connector_hocon()),
- Override = fun(Cfg) ->
- emqx_utils_maps:deep_merge(
- BaseConf,
- #{
- <<"connectors">> =>
- #{
- <<"confluent_producer">> =>
- #{<<"my_producer">> => Cfg}
- }
- }
- )
- end,
- [
- {"base config",
- ?_assertMatch(
- ?ok_connector_config(
- #{
- <<"authentication">> := #{
- <<"mechanism">> := plain
- },
- <<"ssl">> := #{
- <<"enable">> := true,
- <<"verify">> := verify_none
- }
- }
- ),
- check_connector(BaseConf)
- )},
- {"ssl disabled",
- ?_assertThrow(
- ?connector_validation_error(#{expected := "true"}, "false"),
- check_connector(Override(#{<<"ssl">> => #{<<"enable">> => <<"false">>}}))
- )},
- {"bad authn mechanism: scram sha256",
- ?_assertThrow(
- ?connector_validation_error(#{expected := "plain"}, "scram_sha_256"),
- check_connector(
- Override(#{<<"authentication">> => #{<<"mechanism">> => <<"scram_sha_256">>}})
- )
- )},
- {"bad authn mechanism: scram sha512",
- ?_assertThrow(
- ?connector_validation_error(#{expected := "plain"}, "scram_sha_512"),
- check_connector(
- Override(#{<<"authentication">> => #{<<"mechanism">> => <<"scram_sha_512">>}})
- )
- )}
- ].
- confluent_producer_action_test_() ->
- %% ensure this module is loaded when testing only this file
- _ = emqx_bridge_enterprise:module_info(),
- BaseConf = parse(confluent_producer_action_hocon()),
- [
- {"base config",
- ?_assertMatch(
- ?ok_action_config(_),
- check_action(BaseConf)
- )}
- ].
|