|
|
@@ -25,8 +25,6 @@
|
|
|
-include("../../emqx/include/asserts.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
--define(N_SHARDS, 1).
|
|
|
-
|
|
|
opts(Config) ->
|
|
|
proplists:get_value(ds_conf, Config).
|
|
|
|
|
|
@@ -55,33 +53,108 @@ t_01_smoke_store(Config) ->
|
|
|
).
|
|
|
|
|
|
%% A simple smoke test that verifies that getting the list of streams
|
|
|
-%% doesn't crash and that iterators can be opened.
|
|
|
-t_02_smoke_get_streams_start_iter(Config) ->
|
|
|
- DB = ?FUNCTION_NAME,
|
|
|
- ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
|
|
|
- StartTime = 0,
|
|
|
- TopicFilter = ['#'],
|
|
|
- [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
|
|
- ?assertMatch({_, _}, Rank),
|
|
|
- ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime)).
|
|
|
-
|
|
|
-%% A simple smoke test that verifies that it's possible to iterate
|
|
|
+%% doesn't crash, iterators can be opened, and that it's possible to iterate
|
|
|
%% over messages.
|
|
|
-t_03_smoke_iterate(Config) ->
|
|
|
+t_02_smoke_iterate(Config) ->
|
|
|
DB = ?FUNCTION_NAME,
|
|
|
?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
|
|
|
StartTime = 0,
|
|
|
TopicFilter = ['#'],
|
|
|
Msgs = [
|
|
|
message(<<"foo/bar">>, <<"1">>, 0),
|
|
|
- message(<<"foo">>, <<"2">>, 1),
|
|
|
- message(<<"bar/bar">>, <<"3">>, 2)
|
|
|
+ message(<<"foo/bar">>, <<"2">>, 1),
|
|
|
+ message(<<"foo/bar">>, <<"3">>, 2)
|
|
|
],
|
|
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs, #{sync => true})),
|
|
|
+ timer:sleep(1000),
|
|
|
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
|
|
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
|
|
|
{ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
|
|
|
- ?assertEqual(Msgs, Batch, {Iter0, Iter}).
|
|
|
+ emqx_ds_test_helpers:diff_messages(Msgs, Batch).
|
|
|
+
|
|
|
+%% A simple smoke test that verifies that poll request is fulfilled
|
|
|
+%% immediately when the new data is present at the time of poll
|
|
|
+%% request.
|
|
|
+t_03_smoke_poll_immediate(Config) ->
|
|
|
+ DB = ?FUNCTION_NAME,
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
|
|
|
+ %% Store one message to create a stream:
|
|
|
+ Msgs1 = [message(<<"foo/bar">>, <<"0">>, 0)],
|
|
|
+ ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1, #{sync => true})),
|
|
|
+ timer:sleep(1_000),
|
|
|
+ %% Create the iterator:
|
|
|
+ StartTime = 0,
|
|
|
+ TopicFilter = ['#'],
|
|
|
+ [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
|
|
+ {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
|
|
|
+ {ok, Iter1, Batch1} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
|
|
|
+ emqx_ds_test_helpers:diff_messages(Msgs1, Batch1),
|
|
|
+ %% Publish some messages:
|
|
|
+ Msgs2 = [
|
|
|
+ message(<<"foo/bar">>, <<"1">>, 0),
|
|
|
+ message(<<"foo/bar">>, <<"2">>, 1),
|
|
|
+ message(<<"foo/bar">>, <<"3">>, 2)
|
|
|
+ ],
|
|
|
+ ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs2, #{sync => true})),
|
|
|
+ timer:sleep(1_000),
|
|
|
+ %% Now poll the iterator:
|
|
|
+ UserData = ?FUNCTION_NAME,
|
|
|
+ {ok, Ref} = emqx_ds:poll(DB, [{UserData, Iter1}], #{timeout => 5_000}),
|
|
|
+ receive
|
|
|
+ #poll_reply{ref = Ref, userdata = UserData, payload = Payload} ->
|
|
|
+ {ok, Iter, Batch2} = Payload,
|
|
|
+ emqx_ds_test_helpers:diff_messages(Msgs2, Batch2),
|
|
|
+ %% Now verify that the received iterator is valid:
|
|
|
+ ?assertMatch({ok, _, []}, emqx_ds:next(DB, Iter, 10))
|
|
|
+ after 1_000 ->
|
|
|
+ error(no_poll_reply)
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ []
|
|
|
+ ).
|
|
|
+
|
|
|
+%% A simple test that verifies that poll request is fulfilled after
|
|
|
+%% new data is added to the stream
|
|
|
+t_03_smoke_poll_new_data(Config) ->
|
|
|
+ DB = ?FUNCTION_NAME,
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
|
|
|
+ %% Store one message to create a stream:
|
|
|
+ Msgs1 = [message(<<"foo/bar">>, <<"0">>, 0)],
|
|
|
+ ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1, #{sync => true})),
|
|
|
+ timer:sleep(1_000),
|
|
|
+ %% Create the iterator:
|
|
|
+ StartTime = 0,
|
|
|
+ TopicFilter = ['#'],
|
|
|
+ [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
|
|
|
+ {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
|
|
|
+ {ok, Iter1, Batch1} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
|
|
|
+ emqx_ds_test_helpers:diff_messages(Msgs1, Batch1),
|
|
|
+ %% Now poll the iterator:
|
|
|
+ UserData = ?FUNCTION_NAME,
|
|
|
+ {ok, Ref} = emqx_ds:poll(DB, [{UserData, Iter1}], #{timeout => 5_000}),
|
|
|
+ %% Publish some messages:
|
|
|
+ Msgs2 = [
|
|
|
+ message(<<"foo/bar">>, <<"1">>, 0),
|
|
|
+ message(<<"foo/bar">>, <<"2">>, 1),
|
|
|
+ message(<<"foo/bar">>, <<"3">>, 2)
|
|
|
+ ],
|
|
|
+ ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs2, #{sync => true})),
|
|
|
+ receive
|
|
|
+ #poll_reply{ref = Ref, userdata = UserData, payload = Payload} ->
|
|
|
+ {ok, Iter, Batch2} = Payload,
|
|
|
+ emqx_ds_test_helpers:diff_messages(Msgs2, Batch2),
|
|
|
+ %% Now verify that the received iterator is valid:
|
|
|
+ ?assertMatch({ok, _, []}, emqx_ds:next(DB, Iter, 10))
|
|
|
+ after 5_000 ->
|
|
|
+ error(no_poll_reply)
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ []
|
|
|
+ ).
|
|
|
|
|
|
%% Verify that iterators survive restart of the application. This is
|
|
|
%% an important property, since the lifetime of the iterators is tied
|
|
|
@@ -233,6 +306,7 @@ t_08_smoke_list_drop_generation(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_09_atomic_store_batch(Config) ->
|
|
|
+ ct:pal("store batch ~p", [Config]),
|
|
|
DB = ?FUNCTION_NAME,
|
|
|
?check_trace(
|
|
|
begin
|
|
|
@@ -686,38 +760,39 @@ delete(DB, It0, Selector, BatchSize, Acc) ->
|
|
|
%% CT callbacks
|
|
|
|
|
|
all() ->
|
|
|
- [{group, builtin_local}, {group, builtin_raft}].
|
|
|
+ [{group, Backend} || Backend <- backends()].
|
|
|
+
|
|
|
+exclude(emqx_ds_fdb_backend) ->
|
|
|
+ [
|
|
|
+ t_09_atomic_store_batch,
|
|
|
+ t_11_batch_preconditions,
|
|
|
+ t_12_batch_precondition_conflicts,
|
|
|
+ t_smoke_delete_next
|
|
|
+ ];
|
|
|
+exclude(_) ->
|
|
|
+ [].
|
|
|
|
|
|
groups() ->
|
|
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
|
|
+ [{Backend, TCs -- exclude(Backend)} || Backend <- backends()].
|
|
|
+
|
|
|
+init_per_group(emqx_fdb_ds, Config) ->
|
|
|
+ {skip, fixme};
|
|
|
+init_per_group(emqx_ds_builtin_raft, Config) ->
|
|
|
+ %% Raft backend is an odd one, as its main module is named
|
|
|
+ %% `emqx_ds_replication_layer' for historical reasons:
|
|
|
[
|
|
|
- {builtin_local, TCs},
|
|
|
- {builtin_raft, TCs}
|
|
|
+ {backend, emqx_ds_replication_layer},
|
|
|
+ {ds_conf, emqx_ds_replication_layer:test_db_config(Config)}
|
|
|
+ | Config
|
|
|
+ ];
|
|
|
+init_per_group(Backend, Config) ->
|
|
|
+ [
|
|
|
+ {backend, Backend},
|
|
|
+ {ds_conf, Backend:test_db_config(Config)}
|
|
|
+ | Config
|
|
|
].
|
|
|
|
|
|
-init_per_group(builtin_local, Config) ->
|
|
|
- Conf = #{
|
|
|
- backend => builtin_local,
|
|
|
- storage => {emqx_ds_storage_reference, #{}},
|
|
|
- n_shards => ?N_SHARDS
|
|
|
- },
|
|
|
- [{ds_conf, Conf} | Config];
|
|
|
-init_per_group(builtin_raft, Config) ->
|
|
|
- case emqx_ds_test_helpers:skip_if_norepl() of
|
|
|
- false ->
|
|
|
- Conf = #{
|
|
|
- backend => builtin_raft,
|
|
|
- storage => {emqx_ds_storage_reference, #{}},
|
|
|
- n_shards => ?N_SHARDS,
|
|
|
- n_sites => 1,
|
|
|
- replication_factor => 3,
|
|
|
- replication_options => #{}
|
|
|
- },
|
|
|
- [{ds_conf, Conf} | Config];
|
|
|
- Yes ->
|
|
|
- Yes
|
|
|
- end.
|
|
|
-
|
|
|
end_per_group(_Group, Config) ->
|
|
|
Config.
|
|
|
|
|
|
@@ -731,17 +806,18 @@ suite() ->
|
|
|
[{timetrap, 50_000}].
|
|
|
|
|
|
init_per_testcase(TC, Config) ->
|
|
|
- Apps = emqx_cth_suite:start(
|
|
|
- [emqx_durable_storage, emqx_ds_backends],
|
|
|
- #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
|
|
|
- ),
|
|
|
- ct:pal("Apps: ~p", [Apps]),
|
|
|
- [{apps, Apps} | Config].
|
|
|
+ Backend = proplists:get_value(backend, Config),
|
|
|
+ Apps = emqx_cth_suite:start(Backend:test_applications(Config), #{
|
|
|
+ work_dir => emqx_cth_suite:work_dir(TC, Config)
|
|
|
+ }),
|
|
|
+ ct:pal("Started apps: ~p", [Apps]),
|
|
|
+ timer:sleep(1000),
|
|
|
+ Config ++ [{apps, Apps}].
|
|
|
|
|
|
end_per_testcase(TC, Config) ->
|
|
|
- ok = emqx_ds:drop_db(TC),
|
|
|
- ok = emqx_cth_suite:stop(?config(apps, Config)),
|
|
|
- _ = mnesia:delete_schema([node()]),
|
|
|
+ catch ok = emqx_ds:drop_db(TC),
|
|
|
+ catch ok = emqx_cth_suite:stop(?config(apps, Config)),
|
|
|
+ catch mnesia:delete_schema([node()]),
|
|
|
snabbkaffe:stop(),
|
|
|
ok.
|
|
|
|
|
|
@@ -750,3 +826,8 @@ emqx_ds_open_db(X1, X2) ->
|
|
|
ok -> timer:sleep(1000);
|
|
|
Other -> Other
|
|
|
end.
|
|
|
+
|
|
|
+backends() ->
|
|
|
+ application:load(emqx_ds_backends),
|
|
|
+ {ok, L} = application:get_env(emqx_ds_backends, available_backends),
|
|
|
+ L.
|