emqx_persistent_session_ds_state_tests.erl 11 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_ds_state_tests).
  17. -compile(nowarn_export_all).
  18. -compile(export_all).
  19. -include_lib("proper/include/proper.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -define(tab, ?MODULE).
  22. %%================================================================================
  23. %% Type declarations
  24. %%================================================================================
  25. %% Note: here `committed' != `dirty'. It means "has been committed at
  26. %% least once since the creation", and it's used by the iteration
  27. %% test.
  28. -record(s, {subs = #{}, metadata = #{}, streams = #{}, seqno = #{}, committed = false}).
  29. -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
  30. %%================================================================================
  31. %% Properties
  32. %%================================================================================
  33. seqno_proper_test_() ->
  34. Props = [prop_consistency()],
  35. Opts = [{numtests, 10}, {to_file, user}, {max_size, 100}],
  36. {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
  37. prop_consistency() ->
  38. ?FORALL(
  39. Cmds,
  40. commands(?MODULE),
  41. begin
  42. init(),
  43. {_History, State, Result} = run_commands(?MODULE, Cmds),
  44. clean(),
  45. ?WHENFAIL(
  46. io:format(
  47. user,
  48. "Operations: ~p~nState: ~p\nResult: ~p~n",
  49. [Cmds, State, Result]
  50. ),
  51. aggregate(command_names(Cmds), Result =:= ok)
  52. )
  53. end
  54. ).
  55. %%================================================================================
  56. %% Generators
  57. %%================================================================================
  58. -define(n_sessions, 10).
  59. session_id() ->
  60. oneof([integer_to_binary(I) || I <- lists:seq(1, ?n_sessions)]).
  61. topic() ->
  62. oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]).
  63. subid() ->
  64. oneof([[]]).
  65. subscription() ->
  66. oneof([#{}]).
  67. session_id(S) ->
  68. oneof(maps:keys(S)).
  69. batch_size() ->
  70. range(1, ?n_sessions).
  71. put_metadata() ->
  72. oneof([
  73. ?LET(
  74. Val,
  75. range(0, 100),
  76. {last_alive_at, set_last_alive_at, Val}
  77. ),
  78. ?LET(
  79. Val,
  80. range(0, 100),
  81. {created_at, set_created_at, Val}
  82. )
  83. ]).
  84. get_metadata() ->
  85. oneof([
  86. {last_alive_at, get_last_alive_at},
  87. {created_at, get_created_at}
  88. ]).
  89. seqno_track() ->
  90. range(0, 1).
  91. seqno() ->
  92. range(1, 100).
  93. stream_id() ->
  94. range(1, 1).
  95. stream() ->
  96. oneof([#{}]).
  97. put_req() ->
  98. oneof([
  99. ?LET(
  100. {Id, Stream},
  101. {stream_id(), stream()},
  102. {#s.streams, put_stream, Id, Stream}
  103. ),
  104. ?LET(
  105. {Track, Seqno},
  106. {seqno_track(), seqno()},
  107. {#s.seqno, put_seqno, Track, Seqno}
  108. )
  109. ]).
  110. get_req() ->
  111. oneof([
  112. {#s.streams, get_stream, stream_id()},
  113. {#s.seqno, get_seqno, seqno_track()}
  114. ]).
  115. del_req() ->
  116. oneof([
  117. {#s.streams, del_stream, stream_id()}
  118. ]).
  119. command(S) ->
  120. case maps:size(S) > 0 of
  121. true ->
  122. frequency([
  123. %% Global CRUD operations:
  124. {1, {call, ?MODULE, create_new, [session_id()]}},
  125. {1, {call, ?MODULE, delete, [session_id(S)]}},
  126. {2, {call, ?MODULE, reopen, [session_id(S)]}},
  127. {2, {call, ?MODULE, commit, [session_id(S)]}},
  128. %% Subscriptions:
  129. {3,
  130. {call, ?MODULE, put_subscription, [
  131. session_id(S), topic(), subid(), subscription()
  132. ]}},
  133. {3, {call, ?MODULE, del_subscription, [session_id(S), topic(), subid()]}},
  134. %% Metadata:
  135. {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}},
  136. {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}},
  137. %% Key-value:
  138. {3, {call, ?MODULE, gen_put, [session_id(S), put_req()]}},
  139. {3, {call, ?MODULE, gen_get, [session_id(S), get_req()]}},
  140. {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}},
  141. %% Getters:
  142. {4, {call, ?MODULE, get_subscriptions, [session_id(S)]}},
  143. {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
  144. ]);
  145. false ->
  146. frequency([
  147. {1, {call, ?MODULE, create_new, [session_id()]}},
  148. {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
  149. ])
  150. end.
  151. precondition(_, _) ->
  152. true.
  153. postcondition(S, {call, ?MODULE, iterate_sessions, [_]}, Result) ->
  154. {Sessions, _} = lists:unzip(Result),
  155. %% No lingering sessions:
  156. ?assertMatch([], Sessions -- maps:keys(S)),
  157. %% All committed sessions are visited by the iterator:
  158. CommittedSessions = lists:sort([K || {K, #s{committed = true}} <- maps:to_list(S)]),
  159. ?assertMatch([], CommittedSessions -- Sessions),
  160. true;
  161. postcondition(S, {call, ?MODULE, get_metadata, [SessionId, {MetaKey, _Fun}]}, Result) ->
  162. #{SessionId := #s{metadata = Meta}} = S,
  163. ?assertEqual(
  164. maps:get(MetaKey, Meta, undefined),
  165. Result,
  166. #{session_id => SessionId, meta => MetaKey}
  167. ),
  168. true;
  169. postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) ->
  170. #{SessionId := Record} = S,
  171. ?assertEqual(
  172. maps:get(Key, element(Idx, Record), undefined),
  173. Result,
  174. #{session_id => SessionId, key => Key, 'fun' => Fun}
  175. ),
  176. true;
  177. postcondition(S, {call, ?MODULE, get_subscriptions, [SessionId]}, Result) ->
  178. #{SessionId := #s{subs = Subs}} = S,
  179. ?assertEqual(maps:size(Subs), emqx_topic_gbt:size(Result)),
  180. maps:foreach(
  181. fun({TopicFilter, Id}, Expected) ->
  182. ?assertEqual(
  183. Expected,
  184. emqx_topic_gbt:lookup(TopicFilter, Id, Result, default)
  185. )
  186. end,
  187. Subs
  188. ),
  189. true;
  190. postcondition(_, _, _) ->
  191. true.
  192. next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) ->
  193. S#{SessionId => #s{}};
  194. next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) ->
  195. maps:remove(SessionId, S);
  196. next_state(S, _V, {call, ?MODULE, put_subscription, [SessionId, TopicFilter, SubId, Subscription]}) ->
  197. Key = {TopicFilter, SubId},
  198. update(
  199. SessionId,
  200. #s.subs,
  201. fun(Subs) -> Subs#{Key => Subscription} end,
  202. S
  203. );
  204. next_state(S, _V, {call, ?MODULE, del_subscription, [SessionId, TopicFilter, SubId]}) ->
  205. Key = {TopicFilter, SubId},
  206. update(
  207. SessionId,
  208. #s.subs,
  209. fun(Subs) -> maps:remove(Key, Subs) end,
  210. S
  211. );
  212. next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) ->
  213. update(
  214. SessionId,
  215. #s.metadata,
  216. fun(Map) -> Map#{Key => Val} end,
  217. S
  218. );
  219. next_state(S, _V, {call, ?MODULE, gen_put, [SessionId, {Idx, _Fun, Key, Val}]}) ->
  220. update(
  221. SessionId,
  222. Idx,
  223. fun(Map) -> Map#{Key => Val} end,
  224. S
  225. );
  226. next_state(S, _V, {call, ?MODULE, gen_del, [SessionId, {Idx, _Fun, Key}]}) ->
  227. update(
  228. SessionId,
  229. Idx,
  230. fun(Map) -> maps:remove(Key, Map) end,
  231. S
  232. );
  233. next_state(S, _V, {call, ?MODULE, commit, [SessionId]}) ->
  234. update(
  235. SessionId,
  236. #s.committed,
  237. fun(_) -> true end,
  238. S
  239. );
  240. next_state(S, _V, {call, ?MODULE, _, _}) ->
  241. S.
  242. initial_state() ->
  243. #{}.
  244. %%================================================================================
  245. %% Operations
  246. %%================================================================================
  247. create_new(SessionId) ->
  248. put_state(SessionId, emqx_persistent_session_ds_state:create_new(SessionId)).
  249. delete(SessionId) ->
  250. emqx_persistent_session_ds_state:delete(SessionId),
  251. ets:delete(?tab, SessionId).
  252. commit(SessionId) ->
  253. put_state(SessionId, emqx_persistent_session_ds_state:commit(get_state(SessionId))).
  254. reopen(SessionId) ->
  255. _ = emqx_persistent_session_ds_state:commit(get_state(SessionId)),
  256. {ok, S} = emqx_persistent_session_ds_state:open(SessionId),
  257. put_state(SessionId, S).
  258. put_subscription(SessionId, TopicFilter, SubId, Subscription) ->
  259. S = emqx_persistent_session_ds_state:put_subscription(
  260. TopicFilter, SubId, Subscription, get_state(SessionId)
  261. ),
  262. put_state(SessionId, S).
  263. del_subscription(SessionId, TopicFilter, SubId) ->
  264. S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, SubId, get_state(SessionId)),
  265. put_state(SessionId, S).
  266. get_subscriptions(SessionId) ->
  267. emqx_persistent_session_ds_state:get_subscriptions(get_state(SessionId)).
  268. put_metadata(SessionId, {_MetaKey, Fun, Value}) ->
  269. S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]),
  270. put_state(SessionId, S).
  271. get_metadata(SessionId, {_MetaKey, Fun}) ->
  272. apply(emqx_persistent_session_ds_state, Fun, [get_state(SessionId)]).
  273. gen_put(SessionId, {_Idx, Fun, Key, Value}) ->
  274. S = apply(emqx_persistent_session_ds_state, Fun, [Key, Value, get_state(SessionId)]),
  275. put_state(SessionId, S).
  276. gen_del(SessionId, {_Idx, Fun, Key}) ->
  277. S = apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]),
  278. put_state(SessionId, S).
  279. gen_get(SessionId, {_Idx, Fun, Key}) ->
  280. apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]).
  281. iterate_sessions(BatchSize) ->
  282. Fun = fun F(It0) ->
  283. case emqx_persistent_session_ds_state:session_iterator_next(It0, BatchSize) of
  284. {[], _} ->
  285. [];
  286. {Sessions, It} ->
  287. Sessions ++ F(It)
  288. end
  289. end,
  290. Fun(emqx_persistent_session_ds_state:make_session_iterator()).
  291. %%================================================================================
  292. %% Misc.
  293. %%================================================================================
  294. update(SessionId, Key, Fun, S) ->
  295. maps:update_with(
  296. SessionId,
  297. fun(SS) ->
  298. setelement(Key, SS, Fun(erlang:element(Key, SS)))
  299. end,
  300. S
  301. ).
  302. get_state(SessionId) ->
  303. case ets:lookup(?tab, SessionId) of
  304. [{_, S}] ->
  305. S;
  306. [] ->
  307. error({not_found, SessionId})
  308. end.
  309. put_state(SessionId, S) ->
  310. ets:insert(?tab, {SessionId, S}).
  311. init() ->
  312. _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
  313. mria:start(),
  314. emqx_persistent_session_ds_state:create_tables().
  315. clean() ->
  316. ets:delete(?tab),
  317. mria:stop(),
  318. mria_mnesia:delete_schema().