emqx_bridge_confluent_tests.erl 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%--------------------------------------------------------------------
  4. -module(emqx_bridge_confluent_tests).
  5. -include_lib("eunit/include/eunit.hrl").
  6. %%===========================================================================
  7. %% Data Section
  8. %%===========================================================================
  9. %% erlfmt-ignore
  10. confluent_producer_action_hocon() ->
  11. "
  12. actions.confluent_producer.my_producer {
  13. enable = true
  14. connector = my_connector
  15. parameters {
  16. buffer {
  17. memory_overload_protection = false
  18. mode = memory
  19. per_partition_limit = 2GB
  20. segment_bytes = 100MB
  21. }
  22. compression = no_compression
  23. kafka_header_value_encode_mode = none
  24. max_batch_bytes = 896KB
  25. max_inflight = 10
  26. message {
  27. key = \"${.clientid}\"
  28. value = \"${.}\"
  29. }
  30. partition_count_refresh_interval = 60s
  31. partition_strategy = random
  32. query_mode = async
  33. required_acks = all_isr
  34. sync_query_timeout = 5s
  35. topic = test
  36. }
  37. local_topic = \"t/confluent\"
  38. }
  39. ".
  40. confluent_producer_connector_hocon() ->
  41. ""
  42. "\n"
  43. "connectors.confluent_producer.my_producer {\n"
  44. " enable = true\n"
  45. " authentication {\n"
  46. " username = \"user\"\n"
  47. " password = \"xxx\"\n"
  48. " }\n"
  49. " bootstrap_hosts = \"xyz.sa-east1.gcp.confluent.cloud:9092\"\n"
  50. " connect_timeout = 5s\n"
  51. " metadata_request_timeout = 5s\n"
  52. " min_metadata_refresh_interval = 3s\n"
  53. " socket_opts {\n"
  54. " recbuf = 1024KB\n"
  55. " sndbuf = 1024KB\n"
  56. " tcp_keepalive = none\n"
  57. " }\n"
  58. "}\n"
  59. "".
  60. %%===========================================================================
  61. %% Helper functions
  62. %%===========================================================================
  63. parse(Hocon) ->
  64. {ok, Conf} = hocon:binary(Hocon),
  65. Conf.
  66. check(SchemaMod, Conf) when is_map(Conf) ->
  67. hocon_tconf:check_plain(SchemaMod, Conf, #{required => false}).
  68. check_action(Conf) when is_map(Conf) ->
  69. check(emqx_bridge_v2_schema, Conf).
  70. check_connector(Conf) when is_map(Conf) ->
  71. check(emqx_connector_schema, Conf).
  72. -define(validation_error(SchemaMod, Reason, Value),
  73. {SchemaMod, [
  74. #{
  75. kind := validation_error,
  76. reason := Reason,
  77. value := Value
  78. }
  79. ]}
  80. ).
  81. -define(action_validation_error(Reason, Value),
  82. ?validation_error(emqx_bridge_v2_schema, Reason, Value)
  83. ).
  84. -define(connector_validation_error(Reason, Value),
  85. ?validation_error(emqx_connector_schema, Reason, Value)
  86. ).
  87. -define(ok_config(RootKey, Cfg), #{
  88. RootKey :=
  89. #{
  90. <<"confluent_producer">> :=
  91. #{
  92. <<"my_producer">> :=
  93. Cfg
  94. }
  95. }
  96. }).
  97. -define(ok_connector_config(Cfg), ?ok_config(<<"connectors">>, Cfg)).
  98. -define(ok_action_config(Cfg), ?ok_config(<<"actions">>, Cfg)).
  99. %%===========================================================================
  100. %% Test cases
  101. %%===========================================================================
  102. confluent_producer_connector_test_() ->
  103. %% ensure this module is loaded when testing only this file
  104. _ = emqx_bridge_enterprise:module_info(),
  105. BaseConf = parse(confluent_producer_connector_hocon()),
  106. Override = fun(Cfg) ->
  107. emqx_utils_maps:deep_merge(
  108. BaseConf,
  109. #{
  110. <<"connectors">> =>
  111. #{
  112. <<"confluent_producer">> =>
  113. #{<<"my_producer">> => Cfg}
  114. }
  115. }
  116. )
  117. end,
  118. [
  119. {"base config",
  120. ?_assertMatch(
  121. ?ok_connector_config(
  122. #{
  123. <<"authentication">> := #{
  124. <<"mechanism">> := plain
  125. },
  126. <<"ssl">> := #{
  127. <<"enable">> := true,
  128. <<"verify">> := verify_none
  129. }
  130. }
  131. ),
  132. check_connector(BaseConf)
  133. )},
  134. {"ssl disabled",
  135. ?_assertThrow(
  136. ?connector_validation_error(#{expected := "true"}, "false"),
  137. check_connector(Override(#{<<"ssl">> => #{<<"enable">> => <<"false">>}}))
  138. )},
  139. {"bad authn mechanism: scram sha256",
  140. ?_assertThrow(
  141. ?connector_validation_error(#{expected := "plain"}, "scram_sha_256"),
  142. check_connector(
  143. Override(#{<<"authentication">> => #{<<"mechanism">> => <<"scram_sha_256">>}})
  144. )
  145. )},
  146. {"bad authn mechanism: scram sha512",
  147. ?_assertThrow(
  148. ?connector_validation_error(#{expected := "plain"}, "scram_sha_512"),
  149. check_connector(
  150. Override(#{<<"authentication">> => #{<<"mechanism">> => <<"scram_sha_512">>}})
  151. )
  152. )}
  153. ].
  154. confluent_producer_action_test_() ->
  155. %% ensure this module is loaded when testing only this file
  156. _ = emqx_bridge_enterprise:module_info(),
  157. BaseConf = parse(confluent_producer_action_hocon()),
  158. [
  159. {"base config",
  160. ?_assertMatch(
  161. ?ok_action_config(_),
  162. check_action(BaseConf)
  163. )}
  164. ].