| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_persistent_session_ds_state_tests).
- -compile(nowarn_export_all).
- -compile(export_all).
- -include_lib("proper/include/proper.hrl").
- -include_lib("eunit/include/eunit.hrl").
- -define(tab, ?MODULE).
- %%================================================================================
- %% Type declarations
- %%================================================================================
- %% Note: here `committed' != `dirty'. It means "has been committed at
- %% least once since the creation", and it's used by the iteration
- %% test.
- -record(s, {subs = #{}, metadata = #{}, streams = #{}, seqno = #{}, committed = false}).
- -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
- %%================================================================================
- %% Properties
- %%================================================================================
- seqno_proper_test_() ->
- Props = [prop_consistency()],
- Opts = [{numtests, 10}, {to_file, user}, {max_size, 100}],
- {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
- prop_consistency() ->
- ?FORALL(
- Cmds,
- commands(?MODULE),
- begin
- init(),
- {_History, State, Result} = run_commands(?MODULE, Cmds),
- clean(),
- ?WHENFAIL(
- io:format(
- user,
- "Operations: ~p~nState: ~p\nResult: ~p~n",
- [Cmds, State, Result]
- ),
- aggregate(command_names(Cmds), Result =:= ok)
- )
- end
- ).
- %%================================================================================
- %% Generators
- %%================================================================================
- -define(n_sessions, 10).
- session_id() ->
- oneof([integer_to_binary(I) || I <- lists:seq(1, ?n_sessions)]).
- topic() ->
- oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]).
- subscription() ->
- oneof([#{}]).
- session_id(S) ->
- oneof(maps:keys(S)).
- batch_size() ->
- range(1, ?n_sessions).
- put_metadata() ->
- oneof([
- ?LET(
- Val,
- range(0, 100),
- {last_alive_at, set_last_alive_at, Val}
- ),
- ?LET(
- Val,
- range(0, 100),
- {created_at, set_created_at, Val}
- )
- ]).
- get_metadata() ->
- oneof([
- {last_alive_at, get_last_alive_at},
- {created_at, get_created_at}
- ]).
- seqno_track() ->
- range(0, 1).
- seqno() ->
- range(1, 100).
- stream_id() ->
- range(1, 1).
- stream() ->
- oneof([#{}]).
- put_req() ->
- oneof([
- ?LET(
- {Id, Stream},
- {stream_id(), stream()},
- {#s.streams, put_stream, Id, Stream}
- ),
- ?LET(
- {Track, Seqno},
- {seqno_track(), seqno()},
- {#s.seqno, put_seqno, Track, Seqno}
- ),
- ?LET(
- {Topic, Subscription},
- {topic(), subscription()},
- {#s.subs, put_subscription, Topic, Subscription}
- )
- ]).
- get_req() ->
- oneof([
- {#s.streams, get_stream, stream_id()},
- {#s.seqno, get_seqno, seqno_track()},
- {#s.subs, get_subscription, topic()}
- ]).
- del_req() ->
- oneof([
- {#s.streams, del_stream, stream_id()},
- {#s.subs, del_subscription, topic()}
- ]).
- command(S) ->
- case maps:size(S) > 0 of
- true ->
- frequency([
- %% Global CRUD operations:
- {1, {call, ?MODULE, create_new, [session_id()]}},
- {1, {call, ?MODULE, delete, [session_id(S)]}},
- {2, {call, ?MODULE, reopen, [session_id(S)]}},
- {2, {call, ?MODULE, commit, [session_id(S)]}},
- %% Metadata:
- {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}},
- {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}},
- %% Key-value:
- {3, {call, ?MODULE, gen_put, [session_id(S), put_req()]}},
- {3, {call, ?MODULE, gen_get, [session_id(S), get_req()]}},
- {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}},
- %% Getters:
- {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
- ]);
- false ->
- frequency([
- {1, {call, ?MODULE, create_new, [session_id()]}},
- {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
- ])
- end.
- precondition(_, _) ->
- true.
- postcondition(S, {call, ?MODULE, iterate_sessions, [_]}, Result) ->
- {Sessions, _} = lists:unzip(Result),
- %% No lingering sessions:
- ?assertMatch([], Sessions -- maps:keys(S)),
- %% All committed sessions are visited by the iterator:
- CommittedSessions = lists:sort([K || {K, #s{committed = true}} <- maps:to_list(S)]),
- ?assertMatch([], CommittedSessions -- Sessions),
- true;
- postcondition(S, {call, ?MODULE, get_metadata, [SessionId, {MetaKey, _Fun}]}, Result) ->
- #{SessionId := #s{metadata = Meta}} = S,
- ?assertEqual(
- maps:get(MetaKey, Meta, undefined),
- Result,
- #{session_id => SessionId, meta => MetaKey}
- ),
- true;
- postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) ->
- #{SessionId := Record} = S,
- ?assertEqual(
- maps:get(Key, element(Idx, Record), undefined),
- Result,
- #{session_id => SessionId, key => Key, 'fun' => Fun}
- ),
- true;
- postcondition(_, _, _) ->
- true.
- next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) ->
- S#{SessionId => #s{}};
- next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) ->
- maps:remove(SessionId, S);
- next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) ->
- update(
- SessionId,
- #s.metadata,
- fun(Map) -> Map#{Key => Val} end,
- S
- );
- next_state(S, _V, {call, ?MODULE, gen_put, [SessionId, {Idx, _Fun, Key, Val}]}) ->
- update(
- SessionId,
- Idx,
- fun(Map) -> Map#{Key => Val} end,
- S
- );
- next_state(S, _V, {call, ?MODULE, gen_del, [SessionId, {Idx, _Fun, Key}]}) ->
- update(
- SessionId,
- Idx,
- fun(Map) -> maps:remove(Key, Map) end,
- S
- );
- next_state(S, _V, {call, ?MODULE, commit, [SessionId]}) ->
- update(
- SessionId,
- #s.committed,
- fun(_) -> true end,
- S
- );
- next_state(S, _V, {call, ?MODULE, _, _}) ->
- S.
- initial_state() ->
- #{}.
- %%================================================================================
- %% Operations
- %%================================================================================
- create_new(SessionId) ->
- put_state(SessionId, emqx_persistent_session_ds_state:create_new(SessionId)).
- delete(SessionId) ->
- emqx_persistent_session_ds_state:delete(SessionId),
- ets:delete(?tab, SessionId).
- commit(SessionId) ->
- put_state(SessionId, emqx_persistent_session_ds_state:commit(get_state(SessionId))).
- reopen(SessionId) ->
- _ = emqx_persistent_session_ds_state:commit(get_state(SessionId)),
- {ok, S} = emqx_persistent_session_ds_state:open(SessionId),
- put_state(SessionId, S).
- put_metadata(SessionId, {_MetaKey, Fun, Value}) ->
- S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]),
- put_state(SessionId, S).
- get_metadata(SessionId, {_MetaKey, Fun}) ->
- apply(emqx_persistent_session_ds_state, Fun, [get_state(SessionId)]).
- gen_put(SessionId, {_Idx, Fun, Key, Value}) ->
- S = apply(emqx_persistent_session_ds_state, Fun, [Key, Value, get_state(SessionId)]),
- put_state(SessionId, S).
- gen_del(SessionId, {_Idx, Fun, Key}) ->
- S = apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]),
- put_state(SessionId, S).
- gen_get(SessionId, {_Idx, Fun, Key}) ->
- apply(emqx_persistent_session_ds_state, Fun, [Key, get_state(SessionId)]).
- iterate_sessions(BatchSize) ->
- Fun = fun F(It0) ->
- case emqx_persistent_session_ds_state:session_iterator_next(It0, BatchSize) of
- {[], _} ->
- [];
- {Sessions, It} ->
- Sessions ++ F(It)
- end
- end,
- Fun(emqx_persistent_session_ds_state:make_session_iterator()).
- %%================================================================================
- %% Misc.
- %%================================================================================
- update(SessionId, Key, Fun, S) ->
- maps:update_with(
- SessionId,
- fun(SS) ->
- setelement(Key, SS, Fun(erlang:element(Key, SS)))
- end,
- S
- ).
- get_state(SessionId) ->
- case ets:lookup(?tab, SessionId) of
- [{_, S}] ->
- S;
- [] ->
- error({not_found, SessionId})
- end.
- put_state(SessionId, S) ->
- ets:insert(?tab, {SessionId, S}).
- init() ->
- _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
- mria:start(),
- emqx_persistent_session_ds_state:create_tables().
- clean() ->
- ets:delete(?tab),
- mria:stop(),
- mria_mnesia:delete_schema().
|