Pārlūkot izejas kodu

chore: revert changes that handle possible failures when creating sessions

Since we won't be storing session data in DS in this first version for community and
enterprise editions, we'll ignore this possibility here to simplify code.
Thales Macedo Garitezi 1 gadu atpakaļ
vecāks
revīzija
93cf546c8c

+ 4 - 28
apps/emqx/src/emqx_cm.erl

@@ -145,9 +145,6 @@
     (is_binary(CLIENTID) orelse (is_atom(CLIENTID) andalso CLIENTID =/= undefined))
     (is_binary(CLIENTID) orelse (is_atom(CLIENTID) andalso CLIENTID =/= undefined))
 ).
 ).
 
 
-%% When dealing with race conditions during new session creation.
--define(MAX_RETRIES, 5).
-
 %% linting overrides
 %% linting overrides
 -elvis([
 -elvis([
     {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}},
     {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}},
@@ -308,31 +305,10 @@ open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo
         end
         end
     end).
     end).
 
 
-%% Used only when create a *new* session, not resuming a new one.
-create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, ChanPid) ->
-    Attempts = 0,
-    create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, ChanPid, Attempts).
-
-create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg, ChanPid, N) ->
-    case emqx_session:create(ClientInfo, ConnInfo, MaybeWillMsg) of
-        {error, recoverable, session_already_exists} when N < ?MAX_RETRIES ->
-            %% Race condition: session data could still have been replicating and now
-            %% became visible as we attempted to delete it in `open_session'; try again.
-            ok = emqx_session:destroy(ClientInfo, ConnInfo),
-            timer:sleep(100),
-            create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, ChanPid, N + 1);
-        {error, recoverable, session_already_exists} ->
-            {error, failed_to_create_new_session};
-        {error, recoverable, Reason} when N < ?MAX_RETRIES ->
-            ?SLOG(info, #{msg => "cm_failed_to_create_new_session", reason => Reason, retry => true}),
-            timer:sleep(100),
-            create_register_session(ClientInfo, ConnInfo, MaybeWillMsg, ChanPid, N + 1);
-        {error, unrecoverable, Reason} ->
-            {error, Reason};
-        {ok, Session} ->
-            ok = register_channel(ClientId, ChanPid, ConnInfo),
-            {ok, #{session => Session, present => false}}
-    end.
+create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, MaybeWillMsg, ChanPid) ->
+    Session = emqx_session:create(ClientInfo, ConnInfo, MaybeWillMsg),
+    ok = register_channel(ClientId, ChanPid, ConnInfo),
+    {ok, #{session => Session, present => false}}.
 
 
 %% @doc Try to takeover a session from existing channel.
 %% @doc Try to takeover a session from existing channel.
 -spec takeover_session_begin(emqx_types:clientid()) ->
 -spec takeover_session_begin(emqx_types:clientid()) ->

+ 11 - 21
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -235,14 +235,9 @@
 %%
 %%
 
 
 -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
 -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
-    {ok, session()} | emqx_ds:error(session_already_exists).
+    session().
 create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
 create(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
-    case session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of
-        {ok, Session} ->
-            {ok, ensure_timers(Session)};
-        {error, _Class, _Reason} = Error ->
-            Error
-    end.
+    ensure_timers(session_ensure_new(ClientID, ClientInfo, ConnInfo, MaybeWillMsg, Conf)).
 
 
 -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
 -spec open(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
     {_IsPresent :: true, session(), []} | false.
     {_IsPresent :: true, session(), []} | false.
@@ -923,7 +918,7 @@ session_open(
     emqx_maybe:t(message()),
     emqx_maybe:t(message()),
     emqx_session:conf()
     emqx_session:conf()
 ) ->
 ) ->
-    {ok, session()} | emqx_ds:error(session_already_exists).
+    session().
 session_ensure_new(
 session_ensure_new(
     Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf
     Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf
 ) ->
 ) ->
@@ -951,19 +946,14 @@ session_ensure_new(
     S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
     S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
     S6 = set_clientinfo(ClientInfo, S5),
     S6 = set_clientinfo(ClientInfo, S5),
     S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
     S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
-    try emqx_persistent_session_ds_state:commit(S7, #{ensure_new => true}) of
-        S ->
-            {ok, #{
-                id => Id,
-                props => Conf,
-                s => S,
-                shared_sub_s => emqx_persistent_session_ds_shared_subs:new(shared_sub_opts(Id)),
-                inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
-            }}
-    catch
-        throw:session_already_exists ->
-            {error, recoverable, session_already_exists}
-    end.
+    S = emqx_persistent_session_ds_state:commit(S7, #{ensure_new => true}),
+    #{
+        id => Id,
+        props => Conf,
+        s => S,
+        shared_sub_s => emqx_persistent_session_ds_shared_subs:new(shared_sub_opts(Id)),
+        inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
+    }.
 
 
 %% @doc Called when a client reconnects with `clean session=true' or
 %% @doc Called when a client reconnects with `clean session=true' or
 %% during session GC
 %% during session GC

+ 2 - 8
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -516,7 +516,7 @@ commit(
     },
     },
     MetadataVal = #{metadata => Metadata, key_mappings => key_mappings(Rec)},
     MetadataVal = #{metadata => Metadata, key_mappings => key_mappings(Rec)},
     MetadataOp = to_domain_msg(?metadata_domain, SessionId, _Key = undefined, MetadataVal),
     MetadataOp = to_domain_msg(?metadata_domain, SessionId, _Key = undefined, MetadataVal),
-    Res = store_batch(
+    ok = store_batch(
         SessionId,
         SessionId,
         lists:flatten([
         lists:flatten([
             {?TS, MetadataOp},
             {?TS, MetadataOp},
@@ -529,13 +529,7 @@ commit(
         ]),
         ]),
         Opts
         Opts
     ),
     ),
-    case Res of
-        {error, unrecoverable, {precondition_failed, _Msg}} ->
-            %% Race: the session already exists.
-            throw(session_already_exists);
-        ok ->
-            Rec
-    end.
+    Rec.
 
 
 key_mappings(Rec) ->
 key_mappings(Rec) ->
     lists:foldl(
     lists:foldl(

+ 7 - 21
apps/emqx/src/emqx_session.erl

@@ -183,7 +183,7 @@
 %% -------------------------------------------------------------------
 %% -------------------------------------------------------------------
 
 
 -callback create(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
 -callback create(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
-    {ok, t()} | {error, recoverable | unrecoverable, term()}.
+    t().
 -callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
 -callback open(clientinfo(), conninfo(), emqx_maybe:t(message()), conf()) ->
     {_IsPresent :: true, t(), _ReplayContext} | false.
     {_IsPresent :: true, t(), _ReplayContext} | false.
 -callback destroy(t() | clientinfo()) -> ok.
 -callback destroy(t() | clientinfo()) -> ok.
@@ -198,8 +198,7 @@
 %% Create a Session
 %% Create a Session
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
--spec create(clientinfo(), conninfo(), emqx_maybe:t(message())) ->
-    {ok, t()} | {error, recoverable | unrecoverable, term()}.
+-spec create(clientinfo(), conninfo(), emqx_maybe:t(message())) -> t().
 create(ClientInfo, ConnInfo, MaybeWillMsg) ->
 create(ClientInfo, ConnInfo, MaybeWillMsg) ->
     Conf = get_session_conf(ClientInfo),
     Conf = get_session_conf(ClientInfo),
     % FIXME error conditions
     % FIXME error conditions
@@ -209,14 +208,10 @@ create(ClientInfo, ConnInfo, MaybeWillMsg) ->
 
 
 create(Mod, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
 create(Mod, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
     % FIXME error conditions
     % FIXME error conditions
-    case Mod:create(ClientInfo, ConnInfo, MaybeWillMsg, Conf) of
-        {ok, Session} ->
-            ok = emqx_metrics:inc('session.created'),
-            ok = emqx_hooks:run('session.created', [ClientInfo, info(Session)]),
-            {ok, Session};
-        {error, _Class, _Reason} = Error ->
-            Error
-    end.
+    Session = Mod:create(ClientInfo, ConnInfo, MaybeWillMsg, Conf),
+    ok = emqx_metrics:inc('session.created'),
+    ok = emqx_hooks:run('session.created', [ClientInfo, info(Session)]),
+    Session.
 
 
 -spec open(clientinfo(), conninfo(), emqx_maybe:t(message())) ->
 -spec open(clientinfo(), conninfo(), emqx_maybe:t(message())) ->
     {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
     {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
@@ -232,16 +227,7 @@ open(ClientInfo, ConnInfo, MaybeWillMsg) ->
         false ->
         false ->
             %% NOTE
             %% NOTE
             %% Nothing was found, create a new session with the `Default` implementation.
             %% Nothing was found, create a new session with the `Default` implementation.
-            %% TODO: other error conditions?
-            case create(Default, ClientInfo, ConnInfo, MaybeWillMsg, Conf) of
-                {error, recoverable, session_already_exists} ->
-                    %% Race condition: session data could still have been replicating and
-                    %% now became visible as we attempted to create a fresh one; try
-                    %% again.
-                    open(ClientInfo, ConnInfo, MaybeWillMsg);
-                {ok, Session} ->
-                    {false, Session}
-            end
+            {false, create(Default, ClientInfo, ConnInfo, MaybeWillMsg, Conf)}
     end.
     end.
 
 
 try_open([Mod | Rest], ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
 try_open([Mod | Rest], ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->

+ 3 - 3
apps/emqx/src/emqx_session_mem.erl

@@ -162,7 +162,7 @@
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
 -spec create(clientinfo(), conninfo(), emqx_maybe:t(message()), emqx_session:conf()) ->
-    {ok, session()}.
+    session().
 create(
 create(
     #{zone := Zone, clientid := ClientId},
     #{zone := Zone, clientid := ClientId},
     #{expiry_interval := EI, receive_maximum := ReceiveMax},
     #{expiry_interval := EI, receive_maximum := ReceiveMax},
@@ -170,7 +170,7 @@ create(
     Conf
     Conf
 ) ->
 ) ->
     QueueOpts = get_mqueue_conf(Zone),
     QueueOpts = get_mqueue_conf(Zone),
-    {ok, #session{
+    #session{
         id = emqx_guid:gen(),
         id = emqx_guid:gen(),
         clientid = ClientId,
         clientid = ClientId,
         created_at = erlang:system_time(millisecond),
         created_at = erlang:system_time(millisecond),
@@ -185,7 +185,7 @@ create(
         upgrade_qos = maps:get(upgrade_qos, Conf),
         upgrade_qos = maps:get(upgrade_qos, Conf),
         retry_interval = maps:get(retry_interval, Conf),
         retry_interval = maps:get(retry_interval, Conf),
         await_rel_timeout = maps:get(await_rel_timeout, Conf)
         await_rel_timeout = maps:get(await_rel_timeout, Conf)
-    }}.
+    }.
 
 
 get_mqueue_conf(Zone) ->
 get_mqueue_conf(Zone) ->
     #{
     #{

+ 1 - 1
apps/emqx/test/emqx_channel_SUITE.erl

@@ -1096,7 +1096,7 @@ connpkt(Props) ->
 session() -> session(#{zone => default, clientid => <<"fake-test">>}, #{}).
 session() -> session(#{zone => default, clientid => <<"fake-test">>}, #{}).
 session(InitFields) -> session(#{zone => default, clientid => <<"fake-test">>}, InitFields).
 session(InitFields) -> session(#{zone => default, clientid => <<"fake-test">>}, InitFields).
 session(ClientInfo, InitFields) when is_map(InitFields) ->
 session(ClientInfo, InitFields) when is_map(InitFields) ->
-    {ok, Session} = emqx_session:create(
+    Session = emqx_session:create(
         ClientInfo,
         ClientInfo,
         #{
         #{
             receive_maximum => 0,
             receive_maximum => 0,

+ 1 - 1
apps/emqx/test/emqx_connection_SUITE.erl

@@ -685,7 +685,7 @@ channel(InitFields) ->
         is_superuser => false,
         is_superuser => false,
         mountpoint => undefined
         mountpoint => undefined
     },
     },
-    {ok, Session} = emqx_session:create(
+    Session = emqx_session:create(
         ClientInfo,
         ClientInfo,
         #{receive_maximum => 0, expiry_interval => 1000},
         #{receive_maximum => 0, expiry_interval => 1000},
         _WillMsg = undefined
         _WillMsg = undefined

+ 1 - 1
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -324,7 +324,7 @@ postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result)
     ?assertEqual(
     ?assertEqual(
         maps:get(Key, element(Idx, Record), undefined),
         maps:get(Key, element(Idx, Record), undefined),
         Result,
         Result,
-        #{session_id => SessionId, key => Key, 'fun' => Fun, st => get_state(SessionId)}
+        #{session_id => SessionId, key => Key, 'fun' => Fun}
     ),
     ),
     true;
     true;
 postcondition(_, _, _) ->
 postcondition(_, _, _) ->

+ 2 - 2
apps/emqx/test/emqx_session_mem_SUITE.erl

@@ -65,7 +65,7 @@ end_per_suite(Config) ->
 t_session_init(_) ->
 t_session_init(_) ->
     ClientInfo = #{zone => default, clientid => <<"fake-test">>},
     ClientInfo = #{zone => default, clientid => <<"fake-test">>},
     ConnInfo = #{receive_maximum => 64, expiry_interval => 0},
     ConnInfo = #{receive_maximum => 64, expiry_interval => 0},
-    {ok, Session} = emqx_session_mem:create(
+    Session = emqx_session_mem:create(
         ClientInfo,
         ClientInfo,
         ConnInfo,
         ConnInfo,
         _WillMsg = undefined,
         _WillMsg = undefined,
@@ -605,7 +605,7 @@ session() -> session(#{}).
 session(InitFields) when is_map(InitFields) ->
 session(InitFields) when is_map(InitFields) ->
     ClientInfo = #{zone => default, clientid => <<"fake-test">>},
     ClientInfo = #{zone => default, clientid => <<"fake-test">>},
     ConnInfo = #{receive_maximum => 0, expiry_interval => 0},
     ConnInfo = #{receive_maximum => 0, expiry_interval => 0},
-    {ok, Session} = emqx_session_mem:create(
+    Session = emqx_session_mem:create(
         ClientInfo,
         ClientInfo,
         ConnInfo,
         ConnInfo,
         _WillMsg = undefined,
         _WillMsg = undefined,

+ 1 - 1
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -611,7 +611,7 @@ channel(InitFields) ->
         is_superuser => false,
         is_superuser => false,
         mountpoint => undefined
         mountpoint => undefined
     },
     },
-    {ok, Session} = emqx_session:create(
+    Session = emqx_session:create(
         ClientInfo,
         ClientInfo,
         #{receive_maximum => 0, expiry_interval => 0},
         #{receive_maximum => 0, expiry_interval => 0},
         _WillMsg = undefined
         _WillMsg = undefined

+ 1 - 18
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -274,7 +274,7 @@ make_batch(_ForceMonotonic = true, Latest, Messages) ->
 make_batch(false, Latest, Messages) ->
 make_batch(false, Latest, Messages) ->
     assign_operation_timestamps(Latest, Messages, []).
     assign_operation_timestamps(Latest, Messages, []).
 
 
-assign_monotonic_timestamps(Latest0, [#message{} = Message | Rest], Acc0) ->
+assign_monotonic_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
     case emqx_message:timestamp(Message, microsecond) of
     case emqx_message:timestamp(Message, microsecond) of
         TimestampUs when TimestampUs > Latest0 ->
         TimestampUs when TimestampUs > Latest0 ->
             Latest = TimestampUs;
             Latest = TimestampUs;
@@ -283,15 +283,6 @@ assign_monotonic_timestamps(Latest0, [#message{} = Message | Rest], Acc0) ->
     end,
     end,
     Acc = [assign_timestamp(Latest, Message) | Acc0],
     Acc = [assign_timestamp(Latest, Message) | Acc0],
     assign_monotonic_timestamps(Latest, Rest, Acc);
     assign_monotonic_timestamps(Latest, Rest, Acc);
-assign_monotonic_timestamps(Latest0, [{Timestamp, #message{} = Message0} | Rest], Acc0) ->
-    case Timestamp > Latest0 of
-        true ->
-            Latest = Timestamp;
-        false ->
-            Latest = Latest0 + 1
-    end,
-    Acc = [assign_timestamp(Timestamp, Message0) | Acc0],
-    assign_monotonic_timestamps(Latest, Rest, Acc);
 assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
 assign_monotonic_timestamps(Latest, [Operation | Rest], Acc0) ->
     Acc = [Operation | Acc0],
     Acc = [Operation | Acc0],
     assign_monotonic_timestamps(Latest, Rest, Acc);
     assign_monotonic_timestamps(Latest, Rest, Acc);
@@ -303,10 +294,6 @@ assign_operation_timestamps(Latest0, [Message = #message{} | Rest], Acc0) ->
     Latest = max(TimestampUs, Latest0),
     Latest = max(TimestampUs, Latest0),
     Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
     Acc = [assign_timestamp(TimestampUs, Message) | Acc0],
     assign_operation_timestamps(Latest, Rest, Acc);
     assign_operation_timestamps(Latest, Rest, Acc);
-assign_operation_timestamps(Latest0, [{Timestamp, #message{} = Message0} | Rest], Acc0) ->
-    Latest = max(Timestamp, Latest0),
-    Acc = [assign_timestamp(Timestamp, Message0) | Acc0],
-    assign_operation_timestamps(Latest, Rest, Acc);
 assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
 assign_operation_timestamps(Latest, [Operation | Rest], Acc0) ->
     Acc = [Operation | Acc0],
     Acc = [Operation | Acc0],
     assign_operation_timestamps(Latest, Rest, Acc);
     assign_operation_timestamps(Latest, Rest, Acc);
@@ -317,10 +304,6 @@ assign_timestamp(TimestampUs, Message) ->
     {TimestampUs, Message}.
     {TimestampUs, Message}.
 
 
 -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
 -spec shard_of_operation(emqx_ds:db(), emqx_ds:operation(), clientid | topic, _Options) -> shard().
-shard_of_operation(DB, {Timestamp, #message{} = Message}, SerializeBy, Options) when
-    is_integer(Timestamp)
-->
-    shard_of_operation(DB, Message, SerializeBy, Options);
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
     case SerializeBy of
     case SerializeBy of
         clientid -> Key = From;
         clientid -> Key = From;

+ 2 - 16
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -94,8 +94,6 @@
 
 
 -type shard_id() :: binary().
 -type shard_id() :: binary().
 
 
--type message() :: emqx_types:message() | {emqx_ds:time(), emqx_types:message()}.
-
 -type builtin_db_opts() ::
 -type builtin_db_opts() ::
     #{
     #{
         backend := builtin,
         backend := builtin,
@@ -296,9 +294,6 @@ get_streams(DB, TopicFilter, StartTime) ->
                 Streams when is_list(Streams) ->
                 Streams when is_list(Streams) ->
                     ok;
                     ok;
                 {error, _Class, _Reason} ->
                 {error, _Class, _Reason} ->
-                    ?tp(ds_get_streams_failed, #{
-                        db => DB, shard => Shard, class => _Class, reason => _Reason
-                    }),
                     %% TODO: log error
                     %% TODO: log error
                     Streams = []
                     Streams = []
             end,
             end,
@@ -420,7 +415,7 @@ current_timestamp(DB, Shard) ->
 init_buffer(_DB, _Shard, _Options) ->
 init_buffer(_DB, _Shard, _Options) ->
     {ok, #bs{}}.
     {ok, #bs{}}.
 
 
--spec flush_buffer(emqx_ds:db(), shard_id(), [message()], egress_state()) ->
+-spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) ->
     {egress_state(), ok | emqx_ds:error(_)}.
     {egress_state(), ok | emqx_ds:error(_)}.
 flush_buffer(DB, Shard, Messages, State) ->
 flush_buffer(DB, Shard, Messages, State) ->
     case ra_store_batch(DB, Shard, Messages) of
     case ra_store_batch(DB, Shard, Messages) of
@@ -438,10 +433,6 @@ flush_buffer(DB, Shard, Messages, State) ->
     _Options
     _Options
 ) ->
 ) ->
     emqx_ds_replication_layer:shard_id().
     emqx_ds_replication_layer:shard_id().
-shard_of_operation(DB, {Timestamp, #message{} = Message}, SerializeBy, Options) when
-    is_integer(Timestamp)
-->
-    shard_of_operation(DB, Message, SerializeBy, Options);
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
 shard_of_operation(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
     case SerializeBy of
     case SerializeBy of
         clientid -> Key = From;
         clientid -> Key = From;
@@ -1040,7 +1031,7 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
     ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
     ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
     handle_custom_event(DBShard, Timestamp, tick).
     handle_custom_event(DBShard, Timestamp, tick).
 
 
-assign_timestamps(true, Latest0, [#message{} = Message0 | Rest], Acc, N, Sz) ->
+assign_timestamps(true, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
     case emqx_message:timestamp(Message0, microsecond) of
     case emqx_message:timestamp(Message0, microsecond) of
         TimestampUs when TimestampUs > Latest0 ->
         TimestampUs when TimestampUs > Latest0 ->
             Latest = TimestampUs,
             Latest = TimestampUs,
@@ -1057,11 +1048,6 @@ assign_timestamps(false, Latest0, [Message0 = #message{} | Rest], Acc, N, Sz) ->
     Message = assign_timestamp(TimestampUs, Message0),
     Message = assign_timestamp(TimestampUs, Message0),
     MSize = approx_message_size(Message0),
     MSize = approx_message_size(Message0),
     assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
     assign_timestamps(false, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
-assign_timestamps(ForceMonotonic, Latest0, [{Timestamp, #message{} = Message0} | Rest], Acc, N, Sz) ->
-    Latest = max(Latest0, Timestamp),
-    Message = assign_timestamp(Timestamp, Message0),
-    MSize = approx_message_size(Message0),
-    assign_timestamps(ForceMonotonic, Latest, Rest, [Message | Acc], N + 1, Sz + MSize);
 assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
 assign_timestamps(ForceMonotonic, Latest, [Operation | Rest], Acc, N, Sz) ->
     assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz);
     assign_timestamps(ForceMonotonic, Latest, Rest, [Operation | Acc], N + 1, Sz);
 assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) ->
 assign_timestamps(_ForceMonotonic, Latest, [], Acc, N, Size) ->

+ 0 - 1
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -112,7 +112,6 @@
 -type operation() ::
 -type operation() ::
     %% Store a message.
     %% Store a message.
     message()
     message()
-    | {time(), message()}
     %% Delete a message.
     %% Delete a message.
     %% Does nothing if the message does not exist.
     %% Does nothing if the message does not exist.
     | deletion().
     | deletion().

+ 0 - 2
apps/emqx_durable_storage/src/emqx_ds_buffer.erl

@@ -416,8 +416,6 @@ cancel_timer(S = #s{tref = TRef}) ->
 
 
 %% @doc Return approximate size of the MQTT message (it doesn't take
 %% @doc Return approximate size of the MQTT message (it doesn't take
 %% all things into account, for example headers and extras)
 %% all things into account, for example headers and extras)
-payload_size({_TS, #message{} = Message}) ->
-    payload_size(Message);
 payload_size(#message{payload = P, topic = T}) ->
 payload_size(#message{payload = P, topic = T}) ->
     size(P) + size(T);
     size(P) + size(T);
 payload_size({_OpName, _}) ->
 payload_size({_OpName, _}) ->

+ 1 - 2
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl

@@ -55,10 +55,9 @@
 init(ClientInfo, MaybeWillMsg) ->
 init(ClientInfo, MaybeWillMsg) ->
     ConnInfo = #{receive_maximum => 1, expiry_interval => 0},
     ConnInfo = #{receive_maximum => 1, expiry_interval => 0},
     SessionConf = emqx_session:get_session_conf(ClientInfo),
     SessionConf = emqx_session:get_session_conf(ClientInfo),
-    {ok, Session} = emqx_session_mem:create(ClientInfo, ConnInfo, MaybeWillMsg, SessionConf),
     #{
     #{
         registry => emqx_mqttsn_registry:init(),
         registry => emqx_mqttsn_registry:init(),
-        session => Session
+        session => emqx_session_mem:create(ClientInfo, ConnInfo, MaybeWillMsg, SessionConf)
     }.
     }.
 
 
 registry(#{registry := Registry}) ->
 registry(#{registry := Registry}) ->