Jelajahi Sumber

Merge pull request #12014 from ieQu1/ds-qos0

Support QoS0 messages in persistent_session_ds
ieQu1 2 tahun lalu
induk
melakukan
e8d18b0e09

+ 3 - 1
apps/emqx/src/emqx_message.erl

@@ -301,7 +301,9 @@ update_expiry(Msg) ->
     Msg.
 
 %% @doc Message to PUBLISH Packet.
--spec to_packet(emqx_types:packet_id(), emqx_types:message()) ->
+%%
+%% When QoS=0 then packet id must be `undefined'
+-spec to_packet(emqx_types:packet_id() | undefined, emqx_types:message()) ->
     emqx_types:packet().
 to_packet(
     PacketId,

+ 7 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -19,7 +19,7 @@
 -include("emqx.hrl").
 
 -export([init/0]).
--export([is_persistence_enabled/0]).
+-export([is_persistence_enabled/0, force_ds/0]).
 
 %% Message persistence
 -export([
@@ -54,6 +54,12 @@ is_persistence_enabled() ->
 storage_backend() ->
     storage_backend(emqx_config:get([session_persistence, storage])).
 
+%% Dev-only option: force all messages to go through
+%% `emqx_persistent_session_ds':
+-spec force_ds() -> boolean().
+force_ds() ->
+    emqx_config:get([session_persistence, force_persistence]).
+
 storage_backend(#{
     builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor}
 }) ->

+ 21 - 16
apps/emqx/src/emqx_persistent_message_ds_replayer.erl

@@ -27,6 +27,7 @@
 -export_type([inflight/0, seqno/0]).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
 -include("emqx_persistent_session_ds.hrl").
 
 -ifdef(TEST).
@@ -176,9 +177,12 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 ->
     #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0,
     ItBegin = get_last_iterator(DSStream, Ranges),
     {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
-    {Publishes, UntilSeqno} = publish(FirstSeqno, Messages),
-    case range_size(FirstSeqno, UntilSeqno) of
-        Size when Size > 0 ->
+    case Messages of
+        [] ->
+            fetch(SessionId, Inflight0, Streams, N, Acc);
+        _ ->
+            {Publishes, UntilSeqno} = publish(FirstSeqno, Messages, _PreserveQoS0 = true),
+            Size = range_size(FirstSeqno, UntilSeqno),
             %% We need to preserve the iterator pointing to the beginning of the
             %% range, so that we can replay it if needed.
             Range0 = #ds_pubrange{
@@ -197,9 +201,7 @@ fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 ->
                 next_seqno = UntilSeqno,
                 offset_ranges = Ranges ++ [Range]
             },
-            fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]);
-        0 ->
-            fetch(SessionId, Inflight0, Streams, N, Acc)
+            fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc])
     end;
 fetch(_SessionId, Inflight, _Streams, _N, Acc) ->
     Publishes = lists:append(lists:reverse(Acc)),
@@ -268,7 +270,7 @@ replay_range(
         end,
     MessagesReplay = [emqx_message:set_flag(dup, true, Msg) || Msg <- MessagesUnacked],
     %% Asserting that range is consistent with the message storage state.
-    {Replies, Until} = publish(FirstUnacked, MessagesReplay),
+    {Replies, Until} = publish(FirstUnacked, MessagesReplay, _PreserveQoS0 = false),
     %% Again, we need to keep the iterator pointing past the end of the
     %% range, so that we can pick up where we left off.
     Range = Range0#ds_pubrange{iterator = ItNext},
@@ -276,15 +278,18 @@ replay_range(
 replay_range(Range0 = #ds_pubrange{type = checkpoint}, _AckedUntil, Acc) ->
     {Range0, Acc}.
 
-publish(FirstSeqno, Messages) ->
-    lists:mapfoldl(
-        fun(Message, Seqno) ->
-            PacketId = seqno_to_packet_id(Seqno),
-            {{PacketId, Message}, next_seqno(Seqno)}
-        end,
-        FirstSeqno,
-        Messages
-    ).
+publish(FirstSeqNo, Messages, PreserveQos0) ->
+    do_publish(FirstSeqNo, Messages, PreserveQos0, []).
+
+do_publish(SeqNo, [], _, Acc) ->
+    {lists:reverse(Acc), SeqNo};
+do_publish(SeqNo, [#message{qos = 0} | Messages], false, Acc) ->
+    do_publish(SeqNo, Messages, false, Acc);
+do_publish(SeqNo, [#message{qos = 0} = Message | Messages], true, Acc) ->
+    do_publish(SeqNo, Messages, true, [{undefined, Message} | Acc]);
+do_publish(SeqNo, [Message | Messages], PreserveQos0, Acc) ->
+    PacketId = seqno_to_packet_id(SeqNo),
+    do_publish(next_seqno(SeqNo), Messages, PreserveQos0, [{PacketId, Message} | Acc]).
 
 -spec preserve_range(ds_pubrange()) -> ok.
 preserve_range(Range = #ds_pubrange{type = inflight}) ->

+ 5 - 5
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -274,12 +274,12 @@ get_subscription(TopicFilter, #{subscriptions := Subs}) ->
 %%--------------------------------------------------------------------
 
 -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) ->
-    {ok, emqx_types:publish_result(), replies(), session()}
+    {ok, emqx_types:publish_result(), session()}
     | {error, emqx_types:reason_code()}.
 publish(_PacketId, Msg, Session) ->
     %% TODO: QoS2
     Result = emqx_broker:publish(Msg),
-    {ok, Result, [], Session}.
+    {ok, Result, Session}.
 
 %%--------------------------------------------------------------------
 %% Client -> Broker: PUBACK
@@ -338,7 +338,7 @@ pubcomp(_ClientInfo, _PacketId, _Session = #{}) ->
 -spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
     {ok, replies(), session()}.
 deliver(_ClientInfo, _Delivers, Session) ->
-    %% TODO: QoS0 and system messages end up here.
+    %% TODO: system messages end up here.
     {ok, [], Session}.
 
 -spec handle_timeout(clientinfo(), _Timeout, session()) ->
@@ -349,11 +349,11 @@ handle_timeout(
     Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum}
 ) ->
     {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, ReceiveMaximum),
-    %% TODO: make these values configurable:
+    IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]),
     Timeout =
         case Publishes of
             [] ->
-                100;
+                IdlePollInterval;
             [_ | _] ->
                 0
         end,

+ 17 - 0
apps/emqx/src/emqx_schema.erl

@@ -1772,6 +1772,23 @@ fields("session_persistence") ->
                         <<"builtin">> => #{}
                     }
                 }
+            )},
+        {"idle_poll_interval",
+            sc(
+                timeout_duration(),
+                #{
+                    default => <<"100ms">>,
+                    desc => ?DESC(session_ds_idle_poll_interval)
+                }
+            )},
+        {"force_persistence",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    %% Only for testing, shall remain hidden
+                    importance => ?IMPORTANCE_HIDDEN
+                }
             )}
     ];
 fields("session_storage_backend") ->

+ 12 - 6
apps/emqx/src/emqx_session.erl

@@ -626,12 +626,18 @@ choose_impl_candidates(#{expiry_interval := EI}) ->
 choose_impl_candidates(_, _IsPSStoreEnabled = false) ->
     [emqx_session_mem];
 choose_impl_candidates(0, _IsPSStoreEnabled = true) ->
-    %% NOTE
-    %% If ExpiryInterval is 0, the natural choice is `emqx_session_mem`. Yet we still
-    %% need to look the existing session up in the `emqx_persistent_session_ds` store
-    %% first, because previous connection may have set ExpiryInterval to a non-zero
-    %% value.
-    [emqx_session_mem, emqx_persistent_session_ds];
+    case emqx_persistent_message:force_ds() of
+        false ->
+            %% NOTE
+            %% If ExpiryInterval is 0, the natural choice is
+            %% `emqx_session_mem'. Yet we still need to look the
+            %% existing session up in the `emqx_persistent_session_ds'
+            %% store first, because previous connection may have set
+            %% ExpiryInterval to a non-zero value.
+            [emqx_session_mem, emqx_persistent_session_ds];
+        true ->
+            [emqx_persistent_session_ds]
+    end;
 choose_impl_candidates(EI, _IsPSStoreEnabled = true) when EI > 0 ->
     [emqx_persistent_session_ds].
 

+ 53 - 1
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -233,6 +233,56 @@ t_session_subscription_iterators(Config) ->
     ),
     ok.
 
+t_qos0(Config) ->
+    Sub = connect(<<?MODULE_STRING "1">>, true, 30),
+    Pub = connect(<<?MODULE_STRING "2">>, true, 0),
+    try
+        {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1),
+
+        Messages = [
+            {<<"t/1">>, <<"1">>, 0},
+            {<<"t/1">>, <<"2">>, 1},
+            {<<"t/1">>, <<"3">>, 0}
+        ],
+        [emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages],
+        ?assertMatch(
+            [
+                #{qos := 0, topic := <<"t/1">>, payload := <<"1">>},
+                #{qos := 1, topic := <<"t/1">>, payload := <<"2">>},
+                #{qos := 0, topic := <<"t/1">>, payload := <<"3">>}
+            ],
+            receive_messages(3)
+        )
+    after
+        emqtt:stop(Sub),
+        emqtt:stop(Pub)
+    end.
+
+t_publish_as_persistent(Config) ->
+    Sub = connect(<<?MODULE_STRING "1">>, true, 30),
+    Pub = connect(<<?MODULE_STRING "2">>, true, 30),
+    try
+        {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1),
+        Messages = [
+            {<<"t/1">>, <<"1">>, 0},
+            {<<"t/1">>, <<"2">>, 1},
+            {<<"t/1">>, <<"3">>, 2}
+        ],
+        [emqtt:publish(Pub, Topic, Payload, Qos) || {Topic, Payload, Qos} <- Messages],
+        ?assertMatch(
+            [
+                #{qos := 0, topic := <<"t/1">>, payload := <<"1">>},
+                #{qos := 1, topic := <<"t/1">>, payload := <<"2">>}
+                %% TODO: QoS 2
+                %% #{qos := 2, topic := <<"t/1">>, payload := <<"3">>}
+            ],
+            receive_messages(3)
+        )
+    after
+        emqtt:stop(Sub),
+        emqtt:stop(Pub)
+    end.
+
 %%
 
 connect(ClientId, CleanStart, EI) ->
@@ -273,7 +323,7 @@ consume(It) ->
     end.
 
 receive_messages(Count) ->
-    receive_messages(Count, []).
+    lists:reverse(receive_messages(Count, [])).
 
 receive_messages(0, Msgs) ->
     Msgs;
@@ -307,4 +357,6 @@ get_mqtt_port(Node, Type) ->
 
 clear_db() ->
     ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
+    mria:stop(),
+    ok = mnesia:delete_schema([node()]),
     ok.

+ 12 - 6
apps/emqx_durable_storage/src/emqx_ds_lts.erl

@@ -119,7 +119,7 @@ trie_restore(Options, Dump) ->
     Trie.
 
 %% @doc Lookup the topic key. Create a new one, if not found.
--spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key().
+-spec topic_key(trie(), threshold_fun(), [binary() | '']) -> msg_storage_key().
 topic_key(Trie, ThresholdFun, Tokens) ->
     do_topic_key(Trie, ThresholdFun, 0, ?PREFIX, Tokens, []).
 
@@ -363,12 +363,12 @@ emanating(#trie{trie = Tab}, State, ?EOT) ->
         [#trans{next = Next}] -> [{?EOT, Next}];
         [] -> []
     end;
-emanating(#trie{trie = Tab}, State, Bin) when is_binary(Bin) ->
+emanating(#trie{trie = Tab}, State, Token) when is_binary(Token); Token =:= '' ->
     [
         {Edge, Next}
      || #trans{key = {_, Edge}, next = Next} <-
             ets:lookup(Tab, {State, ?PLUS}) ++
-                ets:lookup(Tab, {State, Bin})
+                ets:lookup(Tab, {State, Token})
     ].
 
 %%================================================================================
@@ -533,6 +533,7 @@ topic_match_test() ->
         {S11, []} = test_key(T, ThresholdFun, [1, 1]),
         {S12, []} = test_key(T, ThresholdFun, [1, 2]),
         {S111, []} = test_key(T, ThresholdFun, [1, 1, 1]),
+        {S11e, []} = test_key(T, ThresholdFun, [1, 1, '']),
         %% Match concrete topics:
         assert_match_topics(T, [1], [{S1, []}]),
         assert_match_topics(T, [1, 1], [{S11, []}]),
@@ -540,14 +541,16 @@ topic_match_test() ->
         %% Match topics with +:
         assert_match_topics(T, [1, '+'], [{S11, []}, {S12, []}]),
         assert_match_topics(T, [1, '+', 1], [{S111, []}]),
+        assert_match_topics(T, [1, '+', ''], [{S11e, []}]),
         %% Match topics with #:
         assert_match_topics(T, [1, '#'],
                             [{S1, []},
                              {S11, []}, {S12, []},
-                             {S111, []}]),
+                             {S111, []}, {S11e, []}]),
         assert_match_topics(T, [1, 1, '#'],
                             [{S11, []},
-                             {S111, []}]),
+                             {S111, []},
+                             {S11e, []}]),
         %% Now add learned wildcards:
         {S21, []} = test_key(T, ThresholdFun, [2, 1]),
         {S22, []} = test_key(T, ThresholdFun, [2, 2]),
@@ -587,7 +590,10 @@ assert_match_topics(Trie, Filter0, Expected) ->
 
 %% erlfmt-ignore
 test_key(Trie, Threshold, Topic0) ->
-    Topic = [integer_to_binary(I) || I <- Topic0],
+    Topic = lists:map(fun('') -> '';
+                         (I) -> integer_to_binary(I)
+                      end,
+                      Topic0),
     Ret = topic_key(Trie, Threshold, Topic),
     %% Test idempotency:
     Ret1 = topic_key(Trie, Threshold, Topic),

+ 32 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -34,7 +34,8 @@
     drop_db/1,
     shard_leader/2,
     this_site/0,
-    set_leader/3
+    set_leader/3,
+    print_status/0
 ]).
 
 %% gen_server
@@ -100,6 +101,35 @@
 %% API funcions
 %%================================================================================
 
+-spec print_status() -> ok.
+print_status() ->
+    io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]),
+    io:format("~nSITES:~n", []),
+    Nodes = [node() | nodes()],
+    lists:foreach(
+        fun(#?NODE_TAB{site = Site, node = Node}) ->
+            Status =
+                case lists:member(Node, Nodes) of
+                    true -> up;
+                    false -> down
+                end,
+            io:format("~s    ~p    ~p~n", [base64:encode(Site), Node, Status])
+        end,
+        eval_qlc(mnesia:table(?NODE_TAB))
+    ),
+    io:format("~nSHARDS~n", []),
+    lists:foreach(
+        fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) ->
+            Status =
+                case lists:member(Leader, Nodes) of
+                    true -> up;
+                    false -> down
+                end,
+            io:format("~p/~s    ~p    ~p~n", [DB, Shard, Leader, Status])
+        end,
+        eval_qlc(mnesia:table(?SHARD_TAB))
+    ).
+
 -spec this_site() -> site().
 this_site() ->
     persistent_term:get(?emqx_ds_builtin_site).
@@ -297,6 +327,7 @@ ensure_site() ->
             ok;
         _ ->
             Site = crypto:strong_rand_bytes(8),
+            logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]),
             ok = filelib:ensure_dir(Filename),
             {ok, FD} = file:open(Filename, [write]),
             io:format(FD, "~p.", [Site]),