emqx_authz_api_sources_SUITE.erl 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. %%--------------------------------------------------------------------
  15. -module(emqx_authz_api_sources_SUITE).
  16. -compile(nowarn_export_all).
  17. -compile(export_all).
  18. -import(emqx_mgmt_api_test_util, [request/3, uri/1]).
  19. -include_lib("eunit/include/eunit.hrl").
  20. -include_lib("emqx/include/emqx_placeholder.hrl").
  21. -define(MONGO_SINGLE_HOST, "mongo").
  22. -define(MYSQL_HOST, "mysql:3306").
  23. -define(PGSQL_HOST, "pgsql").
  24. -define(REDIS_SINGLE_HOST, "redis").
  25. -define(SOURCE1, #{
  26. <<"type">> => <<"http">>,
  27. <<"enable">> => true,
  28. <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>,
  29. <<"ssl">> => #{<<"enable">> => true},
  30. <<"headers">> => #{},
  31. <<"method">> => <<"get">>,
  32. <<"request_timeout">> => <<"5s">>
  33. }).
  34. -define(SOURCE2, #{
  35. <<"type">> => <<"mongodb">>,
  36. <<"enable">> => true,
  37. <<"mongo_type">> => <<"single">>,
  38. <<"server">> => <<?MONGO_SINGLE_HOST>>,
  39. <<"w_mode">> => <<"unsafe">>,
  40. <<"pool_size">> => 1,
  41. <<"database">> => <<"mqtt">>,
  42. <<"ssl">> => #{<<"enable">> => false},
  43. <<"collection">> => <<"fake">>,
  44. <<"filter">> => #{<<"a">> => <<"b">>}
  45. }).
  46. -define(SOURCE3, #{
  47. <<"type">> => <<"mysql">>,
  48. <<"enable">> => true,
  49. <<"server">> => <<?MYSQL_HOST>>,
  50. <<"pool_size">> => 1,
  51. <<"database">> => <<"mqtt">>,
  52. <<"username">> => <<"xx">>,
  53. <<"password">> => <<"ee">>,
  54. <<"auto_reconnect">> => true,
  55. <<"ssl">> => #{<<"enable">> => false},
  56. <<"query">> => <<"abcb">>
  57. }).
  58. -define(SOURCE4, #{
  59. <<"type">> => <<"postgresql">>,
  60. <<"enable">> => true,
  61. <<"server">> => <<?PGSQL_HOST>>,
  62. <<"pool_size">> => 1,
  63. <<"database">> => <<"mqtt">>,
  64. <<"username">> => <<"xx">>,
  65. <<"password">> => <<"ee">>,
  66. <<"auto_reconnect">> => true,
  67. <<"ssl">> => #{<<"enable">> => false},
  68. <<"query">> => <<"abcb">>
  69. }).
  70. -define(SOURCE5, #{
  71. <<"type">> => <<"redis">>,
  72. <<"enable">> => true,
  73. <<"servers">> => <<?REDIS_SINGLE_HOST, ",127.0.0.1:6380">>,
  74. <<"redis_type">> => <<"cluster">>,
  75. <<"pool_size">> => 1,
  76. <<"password">> => <<"ee">>,
  77. <<"auto_reconnect">> => true,
  78. <<"ssl">> => #{<<"enable">> => false},
  79. <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>>
  80. }).
  81. -define(SOURCE6, #{
  82. <<"type">> => <<"file">>,
  83. <<"enable">> => true,
  84. <<"rules">> =>
  85. <<
  86. "{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}."
  87. "\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}."
  88. >>
  89. }).
  90. all() ->
  91. emqx_common_test_helpers:all(?MODULE).
  92. groups() ->
  93. [].
  94. init_per_suite(Config) ->
  95. ok = stop_apps([emqx_resource]),
  96. meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
  97. meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end),
  98. meck:expect(emqx_resource, health_check, fun(St) -> {ok, St} end),
  99. meck:expect(emqx_resource, remove_local, fun(_) -> ok end),
  100. meck:expect(
  101. emqx_authz,
  102. acl_conf_file,
  103. fun() ->
  104. emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf")
  105. end
  106. ),
  107. ok = emqx_mgmt_api_test_util:init_suite(
  108. [emqx_conf, emqx_authz],
  109. fun set_special_configs/1
  110. ),
  111. ok = start_apps([emqx_resource]),
  112. Config.
  113. end_per_suite(_Config) ->
  114. {ok, _} = emqx:update_config(
  115. [authorization],
  116. #{
  117. <<"no_match">> => <<"allow">>,
  118. <<"cache">> => #{<<"enable">> => <<"true">>},
  119. <<"sources">> => []
  120. }
  121. ),
  122. %% resource and connector should be stop first,
  123. %% or authz_[mysql|pgsql|redis..]_SUITE would be failed
  124. ok = stop_apps([emqx_resource]),
  125. emqx_mgmt_api_test_util:end_suite([emqx_authz, emqx_conf]),
  126. meck:unload(emqx_resource),
  127. ok.
  128. set_special_configs(emqx_dashboard) ->
  129. emqx_dashboard_api_test_helpers:set_default_config();
  130. set_special_configs(emqx_authz) ->
  131. {ok, _} = emqx:update_config([authorization, cache, enable], false),
  132. {ok, _} = emqx:update_config([authorization, no_match], deny),
  133. {ok, _} = emqx:update_config([authorization, sources], []),
  134. ok;
  135. set_special_configs(_App) ->
  136. ok.
  137. init_per_testcase(t_api, Config) ->
  138. meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]),
  139. meck:expect(emqx_utils, gen_id, fun() -> "fake" end),
  140. meck:new(emqx, [non_strict, passthrough, no_history, no_link]),
  141. meck:expect(
  142. emqx,
  143. data_dir,
  144. fun() ->
  145. {data_dir, Data} = lists:keyfind(data_dir, 1, Config),
  146. Data
  147. end
  148. ),
  149. Config;
  150. init_per_testcase(_, Config) ->
  151. Config.
  152. end_per_testcase(t_api, _Config) ->
  153. meck:unload(emqx_utils),
  154. meck:unload(emqx),
  155. ok;
  156. end_per_testcase(_, _Config) ->
  157. ok.
  158. %%------------------------------------------------------------------------------
  159. %% Testcases
  160. %%------------------------------------------------------------------------------
  161. t_api(_) ->
  162. {ok, 200, Result1} = request(get, uri(["authorization", "sources"]), []),
  163. ?assertEqual([], get_sources(Result1)),
  164. {ok, 404, ErrResult} = request(get, uri(["authorization", "sources", "http"]), []),
  165. ?assertMatch(
  166. #{<<"code">> := <<"NOT_FOUND">>, <<"message">> := <<"Not found: http">>},
  167. emqx_utils_json:decode(ErrResult, [return_maps])
  168. ),
  169. [
  170. begin
  171. {ok, 204, _} = request(post, uri(["authorization", "sources"]), Source)
  172. end
  173. || Source <- lists:reverse([?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6])
  174. ],
  175. {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1),
  176. {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []),
  177. Sources = get_sources(Result2),
  178. ?assertMatch(
  179. [
  180. #{<<"type">> := <<"http">>},
  181. #{<<"type">> := <<"mongodb">>},
  182. #{<<"type">> := <<"mysql">>},
  183. #{<<"type">> := <<"postgresql">>},
  184. #{<<"type">> := <<"redis">>},
  185. #{<<"type">> := <<"file">>}
  186. ],
  187. Sources
  188. ),
  189. ?assert(filelib:is_file(emqx_authz:acl_conf_file())),
  190. {ok, 204, _} = request(
  191. put,
  192. uri(["authorization", "sources", "http"]),
  193. ?SOURCE1#{<<"enable">> := false}
  194. ),
  195. {ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []),
  196. ?assertMatch(
  197. #{<<"type">> := <<"http">>, <<"enable">> := false},
  198. emqx_utils_json:decode(Result3, [return_maps])
  199. ),
  200. Keyfile = emqx_common_test_helpers:app_path(
  201. emqx,
  202. filename:join(["etc", "certs", "key.pem"])
  203. ),
  204. Certfile = emqx_common_test_helpers:app_path(
  205. emqx,
  206. filename:join(["etc", "certs", "cert.pem"])
  207. ),
  208. Cacertfile = emqx_common_test_helpers:app_path(
  209. emqx,
  210. filename:join(["etc", "certs", "cacert.pem"])
  211. ),
  212. {ok, 204, _} = request(
  213. put,
  214. uri(["authorization", "sources", "mongodb"]),
  215. ?SOURCE2#{
  216. <<"ssl">> => #{
  217. <<"enable">> => <<"true">>,
  218. <<"cacertfile">> => Cacertfile,
  219. <<"certfile">> => Certfile,
  220. <<"keyfile">> => Keyfile,
  221. <<"verify">> => <<"verify_none">>
  222. }
  223. }
  224. ),
  225. {ok, 200, Result4} = request(get, uri(["authorization", "sources", "mongodb"]), []),
  226. {ok, 200, Status4} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
  227. #{
  228. <<"metrics">> := #{
  229. <<"allow">> := 0,
  230. <<"deny">> := 0,
  231. <<"total">> := 0,
  232. <<"nomatch">> := 0
  233. }
  234. } = emqx_utils_json:decode(Status4, [return_maps]),
  235. ?assertMatch(
  236. #{
  237. <<"type">> := <<"mongodb">>,
  238. <<"ssl">> := #{
  239. <<"enable">> := <<"true">>,
  240. <<"cacertfile">> := _,
  241. <<"certfile">> := _,
  242. <<"keyfile">> := _,
  243. <<"verify">> := <<"verify_none">>
  244. }
  245. },
  246. emqx_utils_json:decode(Result4, [return_maps])
  247. ),
  248. {ok, Cacert} = file:read_file(Cacertfile),
  249. {ok, Cert} = file:read_file(Certfile),
  250. {ok, Key} = file:read_file(Keyfile),
  251. {ok, 204, _} = request(
  252. put,
  253. uri(["authorization", "sources", "mongodb"]),
  254. ?SOURCE2#{
  255. <<"ssl">> => #{
  256. <<"enable">> => <<"true">>,
  257. <<"cacertfile">> => Cacert,
  258. <<"certfile">> => Cert,
  259. <<"keyfile">> => Key,
  260. <<"verify">> => <<"verify_none">>
  261. }
  262. }
  263. ),
  264. {ok, 200, Result5} = request(get, uri(["authorization", "sources", "mongodb"]), []),
  265. ?assertMatch(
  266. #{
  267. <<"type">> := <<"mongodb">>,
  268. <<"ssl">> := #{
  269. <<"enable">> := <<"true">>,
  270. <<"cacertfile">> := _,
  271. <<"certfile">> := _,
  272. <<"keyfile">> := _,
  273. <<"verify">> := <<"verify_none">>
  274. }
  275. },
  276. emqx_utils_json:decode(Result5, [return_maps])
  277. ),
  278. {ok, 200, Status5_1} = request(get, uri(["authorization", "sources", "mongodb", "status"]), []),
  279. #{
  280. <<"metrics">> := #{
  281. <<"allow">> := 0,
  282. <<"deny">> := 0,
  283. <<"total">> := 0,
  284. <<"nomatch">> := 0
  285. }
  286. } = emqx_utils_json:decode(Status5_1, [return_maps]),
  287. #{
  288. ssl := #{
  289. cacertfile := SavedCacertfile,
  290. certfile := SavedCertfile,
  291. keyfile := SavedKeyfile
  292. }
  293. } = emqx_authz:lookup(mongodb),
  294. ?assert(filelib:is_file(SavedCacertfile)),
  295. ?assert(filelib:is_file(SavedCertfile)),
  296. ?assert(filelib:is_file(SavedKeyfile)),
  297. {ok, 204, _} = request(
  298. put,
  299. uri(["authorization", "sources", "mysql"]),
  300. ?SOURCE3#{<<"server">> := <<"192.168.1.100:3306">>}
  301. ),
  302. {ok, 204, _} = request(
  303. put,
  304. uri(["authorization", "sources", "postgresql"]),
  305. ?SOURCE4#{<<"server">> := <<"fake">>}
  306. ),
  307. {ok, 204, _} = request(
  308. put,
  309. uri(["authorization", "sources", "redis"]),
  310. ?SOURCE5#{
  311. <<"servers">> := [
  312. <<"192.168.1.100:6379">>,
  313. <<"192.168.1.100:6380">>
  314. ]
  315. }
  316. ),
  317. {ok, 400, TypeMismatch} = request(
  318. put,
  319. uri(["authorization", "sources", "file"]),
  320. #{<<"type">> => <<"built_in_database">>, <<"enable">> => false}
  321. ),
  322. ?assertMatch(
  323. #{
  324. <<"code">> := <<"BAD_REQUEST">>,
  325. <<"message">> := <<"Type mismatch", _/binary>>
  326. },
  327. emqx_utils_json:decode(TypeMismatch, [return_maps])
  328. ),
  329. lists:foreach(
  330. fun(#{<<"type">> := Type}) ->
  331. {ok, 204, _} = request(
  332. delete,
  333. uri(["authorization", "sources", binary_to_list(Type)]),
  334. []
  335. )
  336. end,
  337. Sources
  338. ),
  339. {ok, 200, Result6} = request(get, uri(["authorization", "sources"]), []),
  340. ?assertEqual([], get_sources(Result6)),
  341. ?assertEqual([], emqx:get_config([authorization, sources])),
  342. lists:foreach(
  343. fun(#{<<"type">> := Type}) ->
  344. {ok, 404, _} = request(
  345. get,
  346. uri(["authorization", "sources", binary_to_list(Type), "status"]),
  347. []
  348. ),
  349. {ok, 404, _} = request(
  350. post,
  351. uri(["authorization", "sources", binary_to_list(Type), "move"]),
  352. #{<<"position">> => <<"front">>}
  353. ),
  354. {ok, 404, _} = request(
  355. get,
  356. uri(["authorization", "sources", binary_to_list(Type)]),
  357. []
  358. ),
  359. {ok, 404, _} = request(
  360. delete,
  361. uri(["authorization", "sources", binary_to_list(Type)]),
  362. []
  363. )
  364. end,
  365. Sources
  366. ),
  367. {ok, 404, _TypeMismatch2} = request(
  368. put,
  369. uri(["authorization", "sources", "file"]),
  370. #{<<"type">> => <<"built_in_database">>, <<"enable">> => false}
  371. ),
  372. {ok, 404, _} = request(
  373. put,
  374. uri(["authorization", "sources", "built_in_database"]),
  375. #{<<"type">> => <<"built_in_database">>, <<"enable">> => false}
  376. ),
  377. {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE6),
  378. {ok, Client} = emqtt:start_link(
  379. [
  380. {username, <<"u_event3">>},
  381. {clientid, <<"c_event3">>},
  382. {proto_ver, v5},
  383. {properties, #{'Session-Expiry-Interval' => 60}}
  384. ]
  385. ),
  386. emqtt:connect(Client),
  387. emqtt:publish(
  388. Client,
  389. <<"t1">>,
  390. #{'Message-Expiry-Interval' => 60},
  391. <<"{\"id\": 1, \"name\": \"ha\"}">>,
  392. [{qos, 1}]
  393. ),
  394. snabbkaffe:retry(
  395. 10,
  396. 3,
  397. fun() ->
  398. {ok, 200, Status5} = request(
  399. get, uri(["authorization", "sources", "file", "status"]), []
  400. ),
  401. #{
  402. <<"metrics">> := #{
  403. <<"allow">> := 1,
  404. <<"deny">> := 0,
  405. <<"total">> := 1,
  406. <<"nomatch">> := 0
  407. }
  408. } = emqx_utils_json:decode(Status5, [return_maps])
  409. end
  410. ),
  411. emqtt:publish(
  412. Client,
  413. <<"t2">>,
  414. #{'Message-Expiry-Interval' => 60},
  415. <<"{\"id\": 1, \"name\": \"ha\"}">>,
  416. [{qos, 1}]
  417. ),
  418. snabbkaffe:retry(
  419. 10,
  420. 3,
  421. fun() ->
  422. {ok, 200, Status6} = request(
  423. get, uri(["authorization", "sources", "file", "status"]), []
  424. ),
  425. #{
  426. <<"metrics">> := #{
  427. <<"allow">> := 2,
  428. <<"deny">> := 0,
  429. <<"total">> := 2,
  430. <<"nomatch">> := 0
  431. }
  432. } = emqx_utils_json:decode(Status6, [return_maps])
  433. end
  434. ),
  435. emqtt:publish(
  436. Client,
  437. <<"t3">>,
  438. #{'Message-Expiry-Interval' => 60},
  439. <<"{\"id\": 1, \"name\": \"ha\"}">>,
  440. [{qos, 1}]
  441. ),
  442. snabbkaffe:retry(
  443. 10,
  444. 3,
  445. fun() ->
  446. {ok, 200, Status7} = request(
  447. get, uri(["authorization", "sources", "file", "status"]), []
  448. ),
  449. #{
  450. <<"metrics">> := #{
  451. <<"allow">> := 3,
  452. <<"deny">> := 0,
  453. <<"total">> := 3,
  454. <<"nomatch">> := 0
  455. }
  456. } = emqx_utils_json:decode(Status7, [return_maps])
  457. end
  458. ),
  459. ok.
  460. t_source_move(_) ->
  461. {ok, _} = emqx_authz:update(replace, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5]),
  462. ?assertMatch(
  463. [
  464. #{type := http},
  465. #{type := mongodb},
  466. #{type := mysql},
  467. #{type := postgresql},
  468. #{type := redis}
  469. ],
  470. emqx_authz:lookup()
  471. ),
  472. {ok, 204, _} = request(
  473. post,
  474. uri(["authorization", "sources", "postgresql", "move"]),
  475. #{<<"position">> => <<"front">>}
  476. ),
  477. ?assertMatch(
  478. [
  479. #{type := postgresql},
  480. #{type := http},
  481. #{type := mongodb},
  482. #{type := mysql},
  483. #{type := redis}
  484. ],
  485. emqx_authz:lookup()
  486. ),
  487. {ok, 204, _} = request(
  488. post,
  489. uri(["authorization", "sources", "http", "move"]),
  490. #{<<"position">> => <<"rear">>}
  491. ),
  492. ?assertMatch(
  493. [
  494. #{type := postgresql},
  495. #{type := mongodb},
  496. #{type := mysql},
  497. #{type := redis},
  498. #{type := http}
  499. ],
  500. emqx_authz:lookup()
  501. ),
  502. {ok, 204, _} = request(
  503. post,
  504. uri(["authorization", "sources", "mysql", "move"]),
  505. #{<<"position">> => <<"before:postgresql">>}
  506. ),
  507. ?assertMatch(
  508. [
  509. #{type := mysql},
  510. #{type := postgresql},
  511. #{type := mongodb},
  512. #{type := redis},
  513. #{type := http}
  514. ],
  515. emqx_authz:lookup()
  516. ),
  517. {ok, 204, _} = request(
  518. post,
  519. uri(["authorization", "sources", "mongodb", "move"]),
  520. #{<<"position">> => <<"after:http">>}
  521. ),
  522. ?assertMatch(
  523. [
  524. #{type := mysql},
  525. #{type := postgresql},
  526. #{type := redis},
  527. #{type := http},
  528. #{type := mongodb}
  529. ],
  530. emqx_authz:lookup()
  531. ),
  532. ok.
  533. t_aggregate_metrics(_) ->
  534. Metrics = #{
  535. 'emqx@node1.emqx.io' => #{
  536. metrics =>
  537. #{
  538. failed => 0,
  539. total => 1,
  540. rate => 0.0,
  541. rate_last5m => 0.0,
  542. rate_max => 0.1,
  543. success => 1
  544. }
  545. },
  546. 'emqx@node2.emqx.io' => #{
  547. metrics =>
  548. #{
  549. failed => 0,
  550. total => 1,
  551. rate => 0.0,
  552. rate_last5m => 0.0,
  553. rate_max => 0.1,
  554. success => 1
  555. }
  556. }
  557. },
  558. Res = emqx_authn_api:aggregate_metrics(maps:values(Metrics)),
  559. ?assertEqual(
  560. #{
  561. metrics =>
  562. #{
  563. failed => 0,
  564. total => 2,
  565. rate => 0.0,
  566. rate_last5m => 0.0,
  567. rate_max => 0.2,
  568. success => 2
  569. }
  570. },
  571. Res
  572. ).
  573. get_sources(Result) ->
  574. maps:get(<<"sources">>, emqx_utils_json:decode(Result, [return_maps])).
  575. data_dir() -> emqx:data_dir().
  576. start_apps(Apps) ->
  577. lists:foreach(fun application:ensure_all_started/1, Apps).
  578. stop_apps(Apps) ->
  579. lists:foreach(fun application:stop/1, Apps).