emqx_persistent_session_ds_state_tests.erl 14 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_ds_state_tests).
  17. -compile(nowarn_export_all).
  18. -compile(export_all).
  19. -include_lib("proper/include/proper.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. %%================================================================================
  22. %% Type declarations
  23. %%================================================================================
  24. -define(tab, ?MODULE).
  25. -define(DB, emqx_persistent_session).
  26. %% Note: here `committed' != `dirty'. It means "has been committed at
  27. %% least once since the creation", and it's used by the iteration
  28. %% test.
  29. -record(s, {subs = #{}, metadata = #{}, streams = #{}, seqno = #{}, committed = false}).
  30. -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
  31. -define(metadata_domain, metadata).
  32. -define(metadata_domain_bin, <<"metadata">>).
  33. -define(subscription_domain, subscription).
  34. -define(subscription_state_domain, subscription_state).
  35. -define(stream_domain, stream).
  36. -define(rank_domain, rank).
  37. -define(seqno_domain, seqno).
  38. -define(awaiting_rel_domain, awaiting_rel).
  39. %%================================================================================
  40. %% Properties
  41. %%================================================================================
  42. seqno_proper_test_() ->
  43. Props = [prop_consistency()],
  44. Opts = [{numtests, 10}, {to_file, user}, {max_size, 100}],
  45. {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
  46. prop_consistency() ->
  47. ?FORALL(
  48. Cmds,
  49. commands(?MODULE),
  50. begin
  51. init(),
  52. {_History, State, Result} = run_commands(?MODULE, Cmds),
  53. clean(),
  54. ?WHENFAIL(
  55. io:format(
  56. user,
  57. "Operations: ~p~nState: ~p\nResult: ~p~n",
  58. [Cmds, State, Result]
  59. ),
  60. aggregate(command_names(Cmds), Result =:= ok)
  61. )
  62. end
  63. ).
  64. -ifdef(STORE_STATE_IN_DS).
  65. %% Verifies that our internal keys generated for stream keys preserve the order relation
  66. %% between them.
  67. stream_order_internal_keys_proper_test_() ->
  68. Props = [prop_stream_order_internal_keys()],
  69. Opts = [{numtests, 100}, {to_file, user}, {max_size, 100}],
  70. {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
  71. prop_stream_order_internal_keys() ->
  72. ?FORALL(
  73. {Id, Streams0},
  74. {session_id(), list({non_neg_integer(), value_gen(), stream_state()})},
  75. try
  76. init(),
  77. Streams = lists:uniq(Streams0),
  78. StreamKeys = [{R, S} || {R, S, _SS} <- Streams],
  79. ExpectedRanks = lists:sort([R || {R, _S, _SS} <- Streams]),
  80. S = lists:foldl(
  81. fun({R, S, SS}, Acc) ->
  82. emqx_persistent_session_ds_state:put_stream({R, S}, SS, Acc)
  83. end,
  84. emqx_persistent_session_ds_state:create_new(Id),
  85. Streams
  86. ),
  87. RevRanks = emqx_persistent_session_ds_state:fold_streams(
  88. fun({R, _S}, _SS, Acc) -> [R | Acc] end,
  89. [],
  90. S
  91. ),
  92. Ranks = lists:reverse(RevRanks),
  93. ?WHENFAIL(
  94. io:format(
  95. user,
  96. "Expected ranks:\n ~p\nRanks:\n ~p\nStream keys:\n ~p\n",
  97. [ExpectedRanks, Ranks, StreamKeys]
  98. ),
  99. ExpectedRanks =:= Ranks
  100. )
  101. after
  102. clean()
  103. end
  104. ).
  105. %% -ifdef(STORE_STATE_IN_DS).
  106. -endif.
  107. %%================================================================================
  108. %% Generators
  109. %%================================================================================
  110. -define(n_sessions, 10).
  111. session_id() ->
  112. oneof([integer_to_binary(I) || I <- lists:seq(1, ?n_sessions)]).
  113. topic() ->
  114. oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]).
  115. subscription() ->
  116. oneof([#{}]).
  117. session_id(S) ->
  118. oneof(maps:keys(S)).
  119. batch_size() ->
  120. range(1, ?n_sessions).
  121. put_metadata() ->
  122. oneof([
  123. ?LET(
  124. Val,
  125. range(0, 100),
  126. {last_alive_at, set_last_alive_at, Val}
  127. ),
  128. ?LET(
  129. Val,
  130. range(0, 100),
  131. {created_at, set_created_at, Val}
  132. )
  133. ]).
  134. get_metadata() ->
  135. oneof([
  136. {last_alive_at, get_last_alive_at},
  137. {created_at, get_created_at}
  138. ]).
  139. seqno_track() ->
  140. range(0, 1).
  141. seqno() ->
  142. range(1, 100).
  143. -ifdef(STORE_STATE_IN_DS).
  144. stream_id() ->
  145. {range(1, 3), oneof([#{}, {}])}.
  146. %% ELSE ifdef(STORE_STATE_IN_DS).
  147. -else.
  148. stream_id() ->
  149. %% Note: this does not match the stream id type used in practice, which is a
  150. %% `{emqx_persistent_session_ds:subscription_id(), emqx_ds:stream()}'
  151. range(1, 1).
  152. %% END ifdef(STORE_STATE_IN_DS).
  153. -endif.
  154. stream_state() ->
  155. oneof([#{}]).
  156. put_req() ->
  157. oneof([
  158. ?LET(
  159. {Id, Stream},
  160. {stream_id(), stream_state()},
  161. {#s.streams, put_stream, Id, Stream}
  162. ),
  163. ?LET(
  164. {Track, Seqno},
  165. {seqno_track(), seqno()},
  166. {#s.seqno, put_seqno, Track, Seqno}
  167. ),
  168. ?LET(
  169. {Topic, Subscription},
  170. {topic(), subscription()},
  171. {#s.subs, put_subscription, Topic, Subscription}
  172. )
  173. ]).
  174. get_req() ->
  175. oneof([
  176. {#s.streams, get_stream, stream_id()},
  177. {#s.seqno, get_seqno, seqno_track()},
  178. {#s.subs, get_subscription, topic()}
  179. ]).
  180. del_req() ->
  181. oneof([
  182. {#s.streams, del_stream, stream_id()},
  183. {#s.subs, del_subscription, topic()}
  184. ]).
  185. value_gen() ->
  186. oneof([#{}, loose_tuple(oneof([range(1, 3), binary()]))]).
  187. session_id_gen() ->
  188. frequency([
  189. {5, clientid()},
  190. {1, <<"a/">>},
  191. {1, <<"a/b">>},
  192. {1, <<"a/+">>},
  193. {1, <<"a/+/#">>},
  194. {1, <<"#">>},
  195. {1, <<"+">>},
  196. {1, <<"/">>}
  197. ]).
  198. clientid() ->
  199. %% empty string is not valid...
  200. ?SUCHTHAT(ClientId, emqx_proper_types:clientid(), ClientId =/= <<>>).
  201. domain_gen() ->
  202. oneof([
  203. ?metadata_domain,
  204. ?subscription_domain,
  205. ?subscription_state_domain,
  206. ?stream_domain,
  207. ?rank_domain,
  208. ?seqno_domain,
  209. ?awaiting_rel_domain
  210. ]).
  211. key_gen(?metadata_domain) ->
  212. <<"metadata">>;
  213. key_gen(?stream_domain) ->
  214. ?LET(
  215. {Rank, X},
  216. {integer(), integer()},
  217. <<Rank:64, X:64>>
  218. );
  219. key_gen(_) ->
  220. integer().
  221. command(S) ->
  222. case maps:size(S) > 0 of
  223. true ->
  224. frequency([
  225. %% Global CRUD operations:
  226. {1, {call, ?MODULE, create_new, [session_id()]}},
  227. {1, {call, ?MODULE, delete, [session_id(S)]}},
  228. {2, {call, ?MODULE, reopen, [session_id(S)]}},
  229. {2, {call, ?MODULE, commit, [session_id(S)]}},
  230. %% Metadata:
  231. {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}},
  232. {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}},
  233. %% Key-value:
  234. {3, {call, ?MODULE, gen_put, [session_id(S), put_req()]}},
  235. {3, {call, ?MODULE, gen_get, [session_id(S), get_req()]}},
  236. {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}},
  237. %% Getters:
  238. {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
  239. ]);
  240. false ->
  241. frequency([
  242. {1, {call, ?MODULE, create_new, [session_id()]}},
  243. {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
  244. ])
  245. end.
  246. precondition(_, _) ->
  247. true.
  248. postcondition(S, {call, ?MODULE, iterate_sessions, [_]}, Result) ->
  249. {Sessions, _} = lists:unzip(Result),
  250. %% No lingering sessions:
  251. ?assertMatch([], Sessions -- maps:keys(S)),
  252. %% All committed sessions are visited by the iterator:
  253. CommittedSessions = lists:sort([K || {K, #s{committed = true}} <- maps:to_list(S)]),
  254. ?assertMatch([], CommittedSessions -- Sessions),
  255. true;
  256. postcondition(S, {call, ?MODULE, get_metadata, [SessionId, {MetaKey, _Fun}]}, Result) ->
  257. #{SessionId := #s{metadata = Meta}} = S,
  258. ?assertEqual(
  259. maps:get(MetaKey, Meta, undefined),
  260. Result,
  261. #{session_id => SessionId, meta => MetaKey}
  262. ),
  263. true;
  264. postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) ->
  265. #{SessionId := Record} = S,
  266. ?assertEqual(
  267. maps:get(Key, element(Idx, Record), undefined),
  268. Result,
  269. #{session_id => SessionId, key => Key, 'fun' => Fun}
  270. ),
  271. true;
  272. postcondition(_, _, _) ->
  273. true.
  274. next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) ->
  275. S#{SessionId => #s{}};
  276. next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) ->
  277. maps:remove(SessionId, S);
  278. next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) ->
  279. update(
  280. SessionId,
  281. #s.metadata,
  282. fun(Map) -> Map#{Key => Val} end,
  283. S
  284. );
  285. next_state(S, _V, {call, ?MODULE, gen_put, [SessionId, {Idx, _Fun, Key, Val}]}) ->
  286. update(
  287. SessionId,
  288. Idx,
  289. fun(Map) -> Map#{Key => Val} end,
  290. S
  291. );
  292. next_state(S, _V, {call, ?MODULE, gen_del, [SessionId, {Idx, _Fun, Key}]}) ->
  293. update(
  294. SessionId,
  295. Idx,
  296. fun(Map) -> maps:remove(Key, Map) end,
  297. S
  298. );
  299. next_state(S, _V, {call, ?MODULE, commit, [SessionId]}) ->
  300. update(
  301. SessionId,
  302. #s.committed,
  303. fun(_) -> true end,
  304. S
  305. );
  306. next_state(S, _V, {call, ?MODULE, _, _}) ->
  307. S.
  308. initial_state() ->
  309. #{}.
  310. %%================================================================================
  311. %% Operations
  312. %%================================================================================
  313. create_new(SessionId) ->
  314. put_state(SessionId, emqx_persistent_session_ds_state:create_new(SessionId)).
  315. delete(SessionId) ->
  316. emqx_persistent_session_ds_state:delete(SessionId),
  317. ets:delete(?tab, SessionId).
  318. commit(SessionId) ->
  319. put_state(SessionId, emqx_persistent_session_ds_state:commit(get_state(SessionId))).
  320. reopen(SessionId) ->
  321. _ = emqx_persistent_session_ds_state:commit(get_state(SessionId)),
  322. {ok, S} = emqx_persistent_session_ds_state:open(SessionId),
  323. put_state(SessionId, S).
  324. put_metadata(SessionId, {_MetaKey, Fun, Value}) ->
  325. S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]),
  326. put_state(SessionId, S).
  327. get_metadata(SessionId, {_MetaKey, Fun}) ->
  328. apply(emqx_persistent_session_ds_state, Fun, [get_state(SessionId)]).
  329. gen_put(SessionId, {_Idx, Fun, Key, Value}) ->
  330. S = apply(emqx_persistent_session_ds_state, Fun, [Key, Value, get_state(SessionId)]),
  331. put_state(SessionId, S).
  332. gen_del(SessionId, {_Idx, Fun, Key}) ->
  333. S = apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]),
  334. put_state(SessionId, S).
  335. gen_get(SessionId, {_Idx, Fun, Key}) ->
  336. apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]).
  337. iterate_sessions(BatchSize) ->
  338. Fun = fun F(It0) ->
  339. case emqx_persistent_session_ds_state:session_iterator_next(It0, BatchSize) of
  340. {[], _} ->
  341. [];
  342. {Sessions, It} ->
  343. Sessions ++ F(It)
  344. end
  345. end,
  346. Fun(emqx_persistent_session_ds_state:make_session_iterator()).
  347. %%================================================================================
  348. %% Misc.
  349. %%================================================================================
  350. update(SessionId, Key, Fun, S) ->
  351. maps:update_with(
  352. SessionId,
  353. fun(SS) ->
  354. setelement(Key, SS, Fun(erlang:element(Key, SS)))
  355. end,
  356. S
  357. ).
  358. get_state(SessionId) ->
  359. case ets:lookup(?tab, SessionId) of
  360. [{_, S}] ->
  361. S;
  362. [] ->
  363. error({not_found, SessionId})
  364. end.
  365. put_state(SessionId, S) ->
  366. ets:insert(?tab, {SessionId, S}).
  367. -ifdef(STORE_STATE_IN_DS).
  368. init() ->
  369. _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
  370. mria:start(),
  371. {ok, _} = application:ensure_all_started(emqx_ds_backends),
  372. Dir = binary_to_list(filename:join(["/tmp", emqx_guid:to_hexstr(emqx_guid:gen())])),
  373. persistent_term:put({?MODULE, data_dir}, Dir),
  374. application:set_env(emqx_durable_storage, db_data_dir, Dir),
  375. Defaults = #{
  376. backend => builtin_local,
  377. append_only => false,
  378. atomic_batches => true,
  379. storage =>
  380. {emqx_ds_storage_bitfield_lts, #{
  381. topic_index_bytes => 4,
  382. epoch_bits => 10,
  383. bits_per_topic_level => 64
  384. }},
  385. n_shards => 16,
  386. n_sites => 1,
  387. replication_factor => 3,
  388. replication_options => #{}
  389. },
  390. ok = emqx_persistent_session_ds_state:open_db(Defaults),
  391. ok.
  392. %% ELSE ifdef(STORE_STATE_IN_DS).
  393. -else.
  394. init() ->
  395. _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
  396. mria:start(),
  397. emqx_persistent_session_ds_state:create_tables().
  398. %% END ifdef(STORE_STATE_IN_DS).
  399. -endif.
  400. -ifdef(STORE_STATE_IN_DS).
  401. clean() ->
  402. ets:delete(?tab),
  403. emqx_ds:drop_db(?DB),
  404. application:stop(emqx_ds_backends),
  405. application:stop(emqx_ds_builtin_local),
  406. mria:stop(),
  407. mria_mnesia:delete_schema(),
  408. Dir = persistent_term:get({?MODULE, data_dir}),
  409. persistent_term:erase({?MODULE, data_dir}),
  410. ok = file:del_dir_r(Dir),
  411. ok.
  412. %% ELSE ifdef(STORE_STATE_IN_DS).
  413. -else.
  414. clean() ->
  415. ets:delete(?tab),
  416. mria:stop(),
  417. mria_mnesia:delete_schema().
  418. %% END ifdef(STORE_STATE_IN_DS).
  419. -endif.