Преглед изворни кода

Merge pull request #12015 from ieQu1/ds-lts-dont-store-message

feat(ds): Don't store #message record in the DB
ieQu1 пре 2 година
родитељ
комит
1da9ec1d7d
1 измењених фајлова са 67 додато и 2 уклоњено
  1. 67 2
      apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

+ 67 - 2
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -99,8 +99,27 @@
 %% Limit on the number of wildcard levels in the learned topic trie:
 -define(WILDCARD_LIMIT, 10).
 
+%% Persistent (durable) term representing `#message{}' record. Must
+%% not change.
+-type value_v1() ::
+    {
+        _Id :: binary(),
+        _Qos :: 0..2,
+        _From :: atom() | binary(),
+        _Flags :: emqx_types:flags(),
+        _Headsers :: emqx_types:headers(),
+        _Topic :: emqx_types:topic(),
+        _Payload :: emqx_types:payload(),
+        _Timestamp :: integer(),
+        _Extra :: term()
+    }.
+
 -include("emqx_ds_bitmask.hrl").
 
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
 %%================================================================================
 %% API funcions
 %%================================================================================
@@ -389,11 +408,39 @@ hash_topic_level(TopicLevel) ->
     <<Int:64, _/binary>> = erlang:md5(TopicLevel),
     Int.
 
+-spec message_to_value_v1(emqx_types:message()) -> value_v1().
+message_to_value_v1(#message{
+    id = Id,
+    qos = Qos,
+    from = From,
+    flags = Flags,
+    headers = Headers,
+    topic = Topic,
+    payload = Payload,
+    timestamp = Timestamp,
+    extra = Extra
+}) ->
+    {Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}.
+
+-spec value_v1_to_message(value_v1()) -> emqx_types:message().
+value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}) ->
+    #message{
+        id = Id,
+        qos = Qos,
+        from = From,
+        flags = Flags,
+        headers = Headers,
+        topic = Topic,
+        payload = Payload,
+        timestamp = Timestamp,
+        extra = Extra
+    }.
+
 serialize(Msg) ->
-    term_to_binary(Msg).
+    term_to_binary(message_to_value_v1(Msg)).
 
 deserialize(Blob) ->
-    binary_to_term(Blob).
+    value_v1_to_message(binary_to_term(Blob)).
 
 -define(BYTE_SIZE, 8).
 
@@ -452,3 +499,21 @@ data_cf(GenId) ->
 -spec trie_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
 trie_cf(GenId) ->
     "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId).
+
+-ifdef(TEST).
+
+serialize_deserialize_test() ->
+    Msg = #message{
+        id = <<"message_id_val">>,
+        qos = 2,
+        from = <<"from_val">>,
+        flags = #{sys => true, dup => true},
+        headers = #{foo => bar},
+        topic = <<"topic/value">>,
+        payload = [<<"foo">>, <<"bar">>],
+        timestamp = 42424242,
+        extra = "extra_val"
+    },
+    ?assertEqual(Msg, deserialize(serialize(Msg))).
+
+-endif.