|
|
@@ -29,7 +29,10 @@ init_per_suite(Config) ->
|
|
|
<<"backend">> => <<"builtin_raft">>
|
|
|
},
|
|
|
<<"queues">> => #{
|
|
|
- <<"backend">> => <<"builtin_raft">>
|
|
|
+ <<"backend">> => <<"builtin_raft">>,
|
|
|
+ <<"local_write_buffer">> => #{
|
|
|
+ <<"flush_interval">> => <<"10ms">>
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -187,6 +190,44 @@ t_graceful_disconnect(_Config) ->
|
|
|
ok = emqtt:disconnect(ConnShared2),
|
|
|
ok = emqtt:disconnect(ConnPub).
|
|
|
|
|
|
+t_leader_state_preserved(_Config) ->
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ ConnShared1 = emqtt_connect_sub(<<"client1">>),
|
|
|
+ {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/lsp/topic42/#">>, 1),
|
|
|
+
|
|
|
+ ConnShared2 = emqtt_connect_sub(<<"client2">>),
|
|
|
+ {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/lsp/topic42/#">>, 1),
|
|
|
+
|
|
|
+ ConnPub = emqtt_connect_pub(<<"client_pub">>),
|
|
|
+
|
|
|
+ {ok, _} = emqtt:publish(ConnPub, <<"topic42/1/2">>, <<"hello1">>, 1),
|
|
|
+ {ok, _} = emqtt:publish(ConnPub, <<"topic42/3/4">>, <<"hello2">>, 1),
|
|
|
+ ?assertReceive({publish, #{payload := <<"hello1">>}}, 2_000),
|
|
|
+ ?assertReceive({publish, #{payload := <<"hello2">>}}, 2_000),
|
|
|
+
|
|
|
+ ok = emqtt:disconnect(ConnShared1),
|
|
|
+ ok = emqtt:disconnect(ConnShared2),
|
|
|
+
|
|
|
+ %% Equivalent to node restart.
|
|
|
+ ok = terminate_leaders(),
|
|
|
+ ok = timer:sleep(1_000),
|
|
|
+
|
|
|
+ {ok, _} = emqtt:publish(ConnPub, <<"topic42/1/2">>, <<"hello3">>, 1),
|
|
|
+ {ok, _} = emqtt:publish(ConnPub, <<"topic42/3/4">>, <<"hello4">>, 1),
|
|
|
+
|
|
|
+ ConnShared3 = emqtt_connect_sub(<<"client3">>),
|
|
|
+ {ok, _, _} = emqtt:subscribe(ConnShared3, <<"$share/lsp/topic42/#">>, 1),
|
|
|
+
|
|
|
+ ?assertReceive({publish, #{payload := <<"hello3">>}}, 2_000),
|
|
|
+ ?assertReceive({publish, #{payload := <<"hello4">>}}, 2_000),
|
|
|
+
|
|
|
+ ok = emqtt:disconnect(ConnShared3),
|
|
|
+ ok = emqtt:disconnect(ConnPub)
|
|
|
+ end,
|
|
|
+ []
|
|
|
+ ).
|
|
|
+
|
|
|
t_intensive_reassign(_Config) ->
|
|
|
ConnPub = emqtt_connect_pub(<<"client_pub">>),
|
|
|
|