emqx_persistent_session_ds_state_tests.erl 9.7 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_ds_state_tests).
  17. -compile(nowarn_export_all).
  18. -compile(export_all).
  19. -include_lib("proper/include/proper.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -define(tab, ?MODULE).
  22. %%================================================================================
  23. %% Type declarations
  24. %%================================================================================
  25. %% Note: here `committed' != `dirty'. It means "has been committed at
  26. %% least once since the creation", and it's used by the iteration
  27. %% test.
  28. -record(s, {subs = #{}, metadata = #{}, streams = #{}, seqno = #{}, committed = false}).
  29. -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
  30. %%================================================================================
  31. %% Properties
  32. %%================================================================================
  33. seqno_proper_test_() ->
  34. Props = [prop_consistency()],
  35. Opts = [{numtests, 10}, {to_file, user}, {max_size, 100}],
  36. {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
  37. prop_consistency() ->
  38. ?FORALL(
  39. Cmds,
  40. commands(?MODULE),
  41. begin
  42. init(),
  43. {_History, State, Result} = run_commands(?MODULE, Cmds),
  44. clean(),
  45. ?WHENFAIL(
  46. io:format(
  47. user,
  48. "Operations: ~p~nState: ~p\nResult: ~p~n",
  49. [Cmds, State, Result]
  50. ),
  51. aggregate(command_names(Cmds), Result =:= ok)
  52. )
  53. end
  54. ).
  55. %%================================================================================
  56. %% Generators
  57. %%================================================================================
  58. -define(n_sessions, 10).
  59. session_id() ->
  60. oneof([integer_to_binary(I) || I <- lists:seq(1, ?n_sessions)]).
  61. topic() ->
  62. oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]).
  63. subscription() ->
  64. oneof([#{}]).
  65. session_id(S) ->
  66. oneof(maps:keys(S)).
  67. batch_size() ->
  68. range(1, ?n_sessions).
  69. put_metadata() ->
  70. oneof([
  71. ?LET(
  72. Val,
  73. range(0, 100),
  74. {last_alive_at, set_last_alive_at, Val}
  75. ),
  76. ?LET(
  77. Val,
  78. range(0, 100),
  79. {created_at, set_created_at, Val}
  80. )
  81. ]).
  82. get_metadata() ->
  83. oneof([
  84. {last_alive_at, get_last_alive_at},
  85. {created_at, get_created_at}
  86. ]).
  87. seqno_track() ->
  88. range(0, 1).
  89. seqno() ->
  90. range(1, 100).
  91. stream_id() ->
  92. range(1, 1).
  93. stream() ->
  94. oneof([#{}]).
  95. put_req() ->
  96. oneof([
  97. ?LET(
  98. {Id, Stream},
  99. {stream_id(), stream()},
  100. {#s.streams, put_stream, Id, Stream}
  101. ),
  102. ?LET(
  103. {Track, Seqno},
  104. {seqno_track(), seqno()},
  105. {#s.seqno, put_seqno, Track, Seqno}
  106. ),
  107. ?LET(
  108. {Topic, Subscription},
  109. {topic(), subscription()},
  110. {#s.subs, put_subscription, Topic, Subscription}
  111. )
  112. ]).
  113. get_req() ->
  114. oneof([
  115. {#s.streams, get_stream, stream_id()},
  116. {#s.seqno, get_seqno, seqno_track()},
  117. {#s.subs, get_subscription, topic()}
  118. ]).
  119. del_req() ->
  120. oneof([
  121. {#s.streams, del_stream, stream_id()},
  122. {#s.subs, del_subscription, topic()}
  123. ]).
  124. command(S) ->
  125. case maps:size(S) > 0 of
  126. true ->
  127. frequency([
  128. %% Global CRUD operations:
  129. {1, {call, ?MODULE, create_new, [session_id()]}},
  130. {1, {call, ?MODULE, delete, [session_id(S)]}},
  131. {2, {call, ?MODULE, reopen, [session_id(S)]}},
  132. {2, {call, ?MODULE, commit, [session_id(S)]}},
  133. %% Metadata:
  134. {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}},
  135. {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}},
  136. %% Key-value:
  137. {3, {call, ?MODULE, gen_put, [session_id(S), put_req()]}},
  138. {3, {call, ?MODULE, gen_get, [session_id(S), get_req()]}},
  139. {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}},
  140. %% Getters:
  141. {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
  142. ]);
  143. false ->
  144. frequency([
  145. {1, {call, ?MODULE, create_new, [session_id()]}},
  146. {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
  147. ])
  148. end.
  149. precondition(_, _) ->
  150. true.
  151. postcondition(S, {call, ?MODULE, iterate_sessions, [_]}, Result) ->
  152. {Sessions, _} = lists:unzip(Result),
  153. %% No lingering sessions:
  154. ?assertMatch([], Sessions -- maps:keys(S)),
  155. %% All committed sessions are visited by the iterator:
  156. CommittedSessions = lists:sort([K || {K, #s{committed = true}} <- maps:to_list(S)]),
  157. ?assertMatch([], CommittedSessions -- Sessions),
  158. true;
  159. postcondition(S, {call, ?MODULE, get_metadata, [SessionId, {MetaKey, _Fun}]}, Result) ->
  160. #{SessionId := #s{metadata = Meta}} = S,
  161. ?assertEqual(
  162. maps:get(MetaKey, Meta, undefined),
  163. Result,
  164. #{session_id => SessionId, meta => MetaKey}
  165. ),
  166. true;
  167. postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) ->
  168. #{SessionId := Record} = S,
  169. ?assertEqual(
  170. maps:get(Key, element(Idx, Record), undefined),
  171. Result,
  172. #{session_id => SessionId, key => Key, 'fun' => Fun}
  173. ),
  174. true;
  175. postcondition(_, _, _) ->
  176. true.
  177. next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) ->
  178. S#{SessionId => #s{}};
  179. next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) ->
  180. maps:remove(SessionId, S);
  181. next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) ->
  182. update(
  183. SessionId,
  184. #s.metadata,
  185. fun(Map) -> Map#{Key => Val} end,
  186. S
  187. );
  188. next_state(S, _V, {call, ?MODULE, gen_put, [SessionId, {Idx, _Fun, Key, Val}]}) ->
  189. update(
  190. SessionId,
  191. Idx,
  192. fun(Map) -> Map#{Key => Val} end,
  193. S
  194. );
  195. next_state(S, _V, {call, ?MODULE, gen_del, [SessionId, {Idx, _Fun, Key}]}) ->
  196. update(
  197. SessionId,
  198. Idx,
  199. fun(Map) -> maps:remove(Key, Map) end,
  200. S
  201. );
  202. next_state(S, _V, {call, ?MODULE, commit, [SessionId]}) ->
  203. update(
  204. SessionId,
  205. #s.committed,
  206. fun(_) -> true end,
  207. S
  208. );
  209. next_state(S, _V, {call, ?MODULE, _, _}) ->
  210. S.
  211. initial_state() ->
  212. #{}.
  213. %%================================================================================
  214. %% Operations
  215. %%================================================================================
  216. create_new(SessionId) ->
  217. put_state(SessionId, emqx_persistent_session_ds_state:create_new(SessionId)).
  218. delete(SessionId) ->
  219. emqx_persistent_session_ds_state:delete(SessionId),
  220. ets:delete(?tab, SessionId).
  221. commit(SessionId) ->
  222. put_state(SessionId, emqx_persistent_session_ds_state:commit(get_state(SessionId))).
  223. reopen(SessionId) ->
  224. _ = emqx_persistent_session_ds_state:commit(get_state(SessionId)),
  225. {ok, S} = emqx_persistent_session_ds_state:open(SessionId),
  226. put_state(SessionId, S).
  227. put_metadata(SessionId, {_MetaKey, Fun, Value}) ->
  228. S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]),
  229. put_state(SessionId, S).
  230. get_metadata(SessionId, {_MetaKey, Fun}) ->
  231. apply(emqx_persistent_session_ds_state, Fun, [get_state(SessionId)]).
  232. gen_put(SessionId, {_Idx, Fun, Key, Value}) ->
  233. S = apply(emqx_persistent_session_ds_state, Fun, [Key, Value, get_state(SessionId)]),
  234. put_state(SessionId, S).
  235. gen_del(SessionId, {_Idx, Fun, Key}) ->
  236. S = apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]),
  237. put_state(SessionId, S).
  238. gen_get(SessionId, {_Idx, Fun, Key}) ->
  239. apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]).
  240. iterate_sessions(BatchSize) ->
  241. Fun = fun F(It0) ->
  242. case emqx_persistent_session_ds_state:session_iterator_next(It0, BatchSize) of
  243. {[], _} ->
  244. [];
  245. {Sessions, It} ->
  246. Sessions ++ F(It)
  247. end
  248. end,
  249. Fun(emqx_persistent_session_ds_state:make_session_iterator()).
  250. %%================================================================================
  251. %% Misc.
  252. %%================================================================================
  253. update(SessionId, Key, Fun, S) ->
  254. maps:update_with(
  255. SessionId,
  256. fun(SS) ->
  257. setelement(Key, SS, Fun(erlang:element(Key, SS)))
  258. end,
  259. S
  260. ).
  261. get_state(SessionId) ->
  262. case ets:lookup(?tab, SessionId) of
  263. [{_, S}] ->
  264. S;
  265. [] ->
  266. error({not_found, SessionId})
  267. end.
  268. put_state(SessionId, S) ->
  269. ets:insert(?tab, {SessionId, S}).
  270. init() ->
  271. _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
  272. mria:start(),
  273. emqx_persistent_session_ds_state:create_tables().
  274. clean() ->
  275. ets:delete(?tab),
  276. mria:stop(),
  277. mria_mnesia:delete_schema().