Просмотр исходного кода

fix(ds): Add unique ID to the key

ieQu1 2 лет назад
Родитель
Сommit
74cb43f8b1

+ 8 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -115,6 +115,7 @@
     session().
 create(#{clientid := ClientID}, _ConnInfo, Conf) ->
     % TODO: expiration
+    ensure_timers(),
     ensure_session(ClientID, Conf).
 
 -spec open(clientinfo(), conninfo()) ->
@@ -127,10 +128,9 @@ open(#{clientid := ClientID}, _ConnInfo) ->
     %% somehow isolate those idling not-yet-expired sessions into a separate process
     %% space, and move this call back into `emqx_cm` where it belongs.
     ok = emqx_cm:discard_session(ClientID),
-    ensure_timer(pull),
-    ensure_timer(get_streams),
     case open_session(ClientID) of
         Session = #{} ->
+            ensure_timers(),
             {true, Session, []};
         false ->
             false
@@ -705,6 +705,12 @@ export_record(Record, I, [Field | Rest], Acc) ->
 export_record(_, _, [], Acc) ->
     Acc.
 
+%% TODO: find a more reliable way to perform actions that have side
+%% effects. Add `CBM:init' callback to the session behavior?
+ensure_timers() ->
+    ensure_timer(pull),
+    ensure_timer(get_streams).
+
 -spec ensure_timer(pull | get_streams) -> ok.
 ensure_timer(Type) ->
     _ = emqx_utils:start_timer(100, {emqx_session, Type}),

+ 10 - 8
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -26,9 +26,6 @@
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
--define(DEFAULT_KEYSPACE, default).
--define(DS_SHARD_ID, <<"local">>).
--define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
 -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
 
 all() ->
@@ -49,6 +46,7 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
     Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
     [{nodes, Nodes} | Config];
 init_per_testcase(TestCase, Config) ->
+    ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
     Apps = emqx_cth_suite:start(
         app_specs(),
         #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
@@ -59,9 +57,9 @@ end_per_testcase(t_session_subscription_iterators, Config) ->
     Nodes = ?config(nodes, Config),
     emqx_common_test_helpers:call_janitor(60_000),
     ok = emqx_cth_cluster:stop(Nodes),
-    ok;
+    end_per_testcase(common, Config);
 end_per_testcase(_TestCase, Config) ->
-    Apps = ?config(apps, Config),
+    Apps = proplists:get_value(apps, Config, []),
     emqx_common_test_helpers:call_janitor(60_000),
     clear_db(),
     emqx_cth_suite:stop(Apps),
@@ -97,6 +95,7 @@ t_messages_persisted(_Config) ->
     Results = [emqtt:publish(CP, Topic, Payload, 1) || {Topic, Payload} <- Messages],
 
     ct:pal("Results = ~p", [Results]),
+    timer:sleep(2000),
 
     Persisted = consume(['#'], 0),
 
@@ -141,6 +140,8 @@ t_messages_persisted_2(_Config) ->
     {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} =
         emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1),
 
+    timer:sleep(2000),
+
     Persisted = consume(['#'], 0),
 
     ct:pal("Persisted = ~p", [Persisted]),
@@ -251,13 +252,14 @@ connect(Opts0 = #{}) ->
     {ok, _} = emqtt:connect(Client),
     Client.
 
-consume(TopicFiler, StartMS) ->
+consume(TopicFilter, StartMS) ->
+    Streams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartMS),
     lists:flatmap(
         fun({_Rank, Stream}) ->
-            {ok, It} = emqx_ds:make_iterator(Stream, StartMS, 0),
+            {ok, It} = emqx_ds:make_iterator(Stream, TopicFilter, StartMS),
             consume(It)
         end,
-        emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS)
+        Streams
     ).
 
 consume(It) ->

+ 8 - 5
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -24,6 +24,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
+
 %%--------------------------------------------------------------------
 %% SUITE boilerplate
 %%--------------------------------------------------------------------
@@ -131,6 +133,7 @@ get_listener_port(Type, Name) ->
 end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
     ok = emqx_cth_suite:stop(?config(group_apps, Config));
 end_per_group(_, _Config) ->
+    ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
     ok.
 
 init_per_testcase(TestCase, Config) ->
@@ -188,7 +191,7 @@ receive_messages(Count, Msgs) ->
             receive_messages(Count - 1, [Msg | Msgs]);
         _Other ->
             receive_messages(Count, Msgs)
-    after 5000 ->
+    after 15000 ->
         Msgs
     end.
 
@@ -227,11 +230,11 @@ wait_for_cm_unregister(ClientId, N) ->
     end.
 
 publish(Topic, Payloads) ->
-    publish(Topic, Payloads, false).
+    publish(Topic, Payloads, false, 2).
 
-publish(Topic, Payloads, WaitForUnregister) ->
+publish(Topic, Payloads, WaitForUnregister, QoS) ->
     Fun = fun(Client, Payload) ->
-        {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
+        {ok, _} = emqtt:publish(Client, Topic, Payload, QoS)
     end,
     do_publish(Payloads, Fun, WaitForUnregister).
 
@@ -532,7 +535,7 @@ t_publish_while_client_is_gone_qos1(Config) ->
     ok = emqtt:disconnect(Client1),
     maybe_kill_connection_process(ClientId, Config),
 
-    ok = publish(Topic, [Payload1, Payload2]),
+    ok = publish(Topic, [Payload1, Payload2], false, 1),
 
     {ok, Client2} = emqtt:start_link([
         {proto_ver, v5},

+ 19 - 0
apps/emqx_durable_storage/include/emqx_ds.hrl

@@ -0,0 +1,19 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_DS_HRL_HRL).
+-define(EMQX_DS_HRL_HRL, true).
+
+-endif.

+ 1 - 0
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -111,6 +111,7 @@ open_db(DB, Opts = #{backend := builtin}) ->
     emqx_ds_replication_layer:open_db(DB, Opts).
 
 %% @doc TODO: currently if one or a few shards are down, they won't be
+
 %% deleted.
 -spec drop_db(db()) -> ok.
 drop_db(DB) ->

+ 3 - 2
apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl

@@ -214,7 +214,7 @@ vector_to_key(#keymapper{scanner = [Actions | Scanner]}, [Coord | Vector]) ->
 bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, Binaries) ->
     Vec = lists:zipwith(
         fun(Bin, SizeOf) ->
-            <<Int:SizeOf, _/binary>> = Bin,
+            <<Int:SizeOf>> = Bin,
             Int
         end,
         Binaries,
@@ -402,7 +402,8 @@ bin_increment(
     Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter, range_max = RangeMax},
     KeyBin
 ) ->
-    <<Key0:Size>> = KeyBin,
+    %% The key may contain random suffix, skip it:
+    <<Key0:Size, _/binary>> = KeyBin,
     Key1 = Key0 + 1,
     if
         Key1 band Bitmask =:= Bitfilter, Key1 =< RangeMax ->

+ 11 - 4
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -206,7 +206,10 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime,
     %% Make filter:
     Inequations = [
         {'=', TopicIndex},
-        {StartTime, '..', SafeCutoffTime - 1}
+        {StartTime, '..', SafeCutoffTime - 1},
+        %% Unique integer:
+        any
+        %% Varying topic levels:
         | lists:map(
             fun
                 ('+') ->
@@ -337,9 +340,12 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam
 ]) ->
     binary().
 make_key(KeyMapper, TopicIndex, Timestamp, Varying) ->
+    UniqueInteger = erlang:unique_integer([monotonic, positive]),
     emqx_ds_bitmask_keymapper:key_to_bitstring(
         KeyMapper,
-        emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [TopicIndex, Timestamp | Varying])
+        emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [
+            TopicIndex, Timestamp, UniqueInteger | Varying
+        ])
     ).
 
 %% TODO: don't hardcode the thresholds
@@ -366,9 +372,10 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
     %% Dimension Offset        Bitsize
         [{1,     0,            TopicIndexBytes * ?BYTE_SIZE},      %% Topic index
          {2,     TSOffsetBits, TSBits - TSOffsetBits       }] ++   %% Timestamp epoch
-        [{2 + I, 0,            BitsPerTopicLevel           }       %% Varying topic levels
+        [{3 + I, 0,            BitsPerTopicLevel           }       %% Varying topic levels
                                                            || I <- lists:seq(1, N)] ++
-        [{2,     0,            TSOffsetBits                }],     %% Timestamp offset
+        [{2,     0,            TSOffsetBits                },      %% Timestamp offset
+         {3,     0,            64                          }],     %% Unique integer
     Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)),
     %% Assert:
     case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of

+ 4 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -24,6 +24,8 @@
 -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 %% internal exports:
+-export([db_dir/1]).
+
 -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]).
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -132,7 +134,7 @@ open_shard(Shard, Options) ->
 
 -spec drop_shard(shard_id()) -> ok.
 drop_shard(Shard) ->
-    emqx_ds_storage_layer_sup:stop_shard(Shard),
+    catch emqx_ds_storage_layer_sup:stop_shard(Shard),
     ok = rocksdb:destroy(db_dir(Shard), []).
 
 -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
@@ -368,7 +370,7 @@ rocksdb_open(Shard, Options) ->
 
 -spec db_dir(shard_id()) -> file:filename().
 db_dir({DB, ShardId}) ->
-    filename:join(["data", atom_to_list(DB), atom_to_list(ShardId)]).
+    filename:join([emqx:data_dir(), atom_to_list(DB), atom_to_list(ShardId)]).
 
 %%--------------------------------------------------------------------------------
 %% Schema access