emqx_bridge_http_test_lib.erl 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_bridge_http_test_lib).
  16. -export([
  17. bridge_type/0,
  18. bridge_name/0,
  19. make_bridge/1,
  20. bridge_async_config/1,
  21. init_http_success_server/1,
  22. success_http_handler/0
  23. ]).
  24. -define(BRIDGE_TYPE, bridge_type()).
  25. -define(BRIDGE_NAME, bridge_name()).
  26. bridge_type() ->
  27. <<"webhook">>.
  28. bridge_name() ->
  29. atom_to_binary(?MODULE).
  30. make_bridge(Config) ->
  31. Type = ?BRIDGE_TYPE,
  32. Name = ?BRIDGE_NAME,
  33. BridgeConfig = bridge_async_config(Config#{
  34. name => Name,
  35. type => Type
  36. }),
  37. {ok, _} = emqx_bridge:create(
  38. Type,
  39. Name,
  40. BridgeConfig
  41. ),
  42. emqx_bridge_resource:bridge_id(Type, Name).
  43. bridge_async_config(#{port := Port} = Config) ->
  44. Type = maps:get(type, Config, ?BRIDGE_TYPE),
  45. Name = maps:get(name, Config, ?BRIDGE_NAME),
  46. Host = maps:get(host, Config, "localhost"),
  47. Path = maps:get(path, Config, ""),
  48. PoolSize = maps:get(pool_size, Config, 1),
  49. QueryMode = maps:get(query_mode, Config, "async"),
  50. ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
  51. RequestTimeout = maps:get(request_timeout, Config, "10s"),
  52. ResumeInterval = maps:get(resume_interval, Config, "1s"),
  53. HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"),
  54. ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
  55. LocalTopic =
  56. case maps:find(local_topic, Config) of
  57. {ok, LT} ->
  58. lists:flatten(["local_topic = \"", LT, "\""]);
  59. error ->
  60. ""
  61. end,
  62. ConfigString = io_lib:format(
  63. "bridges.~s.~s {\n"
  64. " url = \"http://~s:~p~s\"\n"
  65. " connect_timeout = \"~p\"\n"
  66. " enable = true\n"
  67. %% local_topic
  68. " ~s\n"
  69. " enable_pipelining = 100\n"
  70. " max_retries = 2\n"
  71. " method = \"post\"\n"
  72. " pool_size = ~p\n"
  73. " pool_type = \"random\"\n"
  74. " request_timeout = \"~s\"\n"
  75. " body = \"${id}\"\n"
  76. " resource_opts {\n"
  77. " inflight_window = 100\n"
  78. " health_check_interval = \"~s\"\n"
  79. " max_buffer_bytes = \"1GB\"\n"
  80. " query_mode = \"~s\"\n"
  81. " request_ttl = \"~p\"\n"
  82. " resume_interval = \"~s\"\n"
  83. " start_after_created = \"true\"\n"
  84. " start_timeout = \"5s\"\n"
  85. " worker_pool_size = \"1\"\n"
  86. " }\n"
  87. " ssl {\n"
  88. " enable = false\n"
  89. " }\n"
  90. "}\n",
  91. [
  92. Type,
  93. Name,
  94. Host,
  95. Port,
  96. Path,
  97. ConnectTimeout,
  98. LocalTopic,
  99. PoolSize,
  100. RequestTimeout,
  101. HealthCheckInterval,
  102. QueryMode,
  103. ResourceRequestTTL,
  104. ResumeInterval
  105. ]
  106. ),
  107. ct:pal(ConfigString),
  108. parse_and_check(ConfigString, Type, Name).
  109. parse_and_check(ConfigString, BridgeType, Name) ->
  110. {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
  111. hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
  112. #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
  113. RetConfig.
  114. success_http_handler() ->
  115. success_http_handler(#{response_delay => 0}).
  116. success_http_handler(Opts) ->
  117. ResponseDelay = maps:get(response_delay, Opts, 0),
  118. TestPid = self(),
  119. fun(Req0, State) ->
  120. {ok, Body, Req} = cowboy_req:read_body(Req0),
  121. Headers = cowboy_req:headers(Req),
  122. ct:pal("http request received: ~p", [
  123. #{body => Body, headers => Headers, response_delay => ResponseDelay}
  124. ]),
  125. ResponseDelay > 0 andalso timer:sleep(ResponseDelay),
  126. TestPid ! {http, Headers, Body},
  127. Rep = cowboy_req:reply(
  128. 200,
  129. #{<<"content-type">> => <<"text/plain">>},
  130. <<"hello">>,
  131. Req
  132. ),
  133. {ok, Rep, State}
  134. end.
  135. init_http_success_server(Config) ->
  136. HTTPPath = <<"/path">>,
  137. ServerSSLOpts = false,
  138. {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
  139. _Port = random, HTTPPath, ServerSSLOpts
  140. ),
  141. ResponseDelayMS = 500,
  142. ok = emqx_bridge_http_connector_test_server:set_handler(
  143. success_http_handler(#{response_delay => ResponseDelayMS})
  144. ),
  145. [
  146. {http_server, #{port => HTTPPort, path => HTTPPath}},
  147. {response_delay_ms, ResponseDelayMS},
  148. {bridge_name, ?BRIDGE_NAME}
  149. | Config
  150. ].