Sfoglia il codice sorgente

feat(ds): add dedicated `#message_matcher{}` for preconditions

Andrew Mayorov 1 anno fa
parent
commit
0e545ffcec

+ 1 - 3
apps/emqx/src/emqx_types.erl

@@ -82,7 +82,6 @@
 -export_type([
     payload/0,
     message/0,
-    message/1,
     flag/0,
     flags/0,
     headers/0
@@ -239,8 +238,7 @@
 -type subscription() :: #subscription{}.
 -type subscriber() :: {pid(), subid()}.
 -type payload() :: binary() | iodata().
--type message() :: #message{payload :: emqx_types:payload()}.
--type message(Payload) :: #message{payload :: Payload}.
+-type message() :: #message{}.
 -type flag() :: sys | dup | retain | atom().
 -type flags() :: #{flag() := boolean()}.
 -type headers() :: #{

+ 19 - 0
apps/emqx_durable_storage/include/emqx_ds.hrl

@@ -21,4 +21,23 @@
     preconditions = [] :: [emqx_ds:precondition()]
 }).
 
+-record(message_matcher, {
+    %% Fields identifying the message:
+    %% Client identifier
+    from :: binary(),
+    %% Topic that the message is published to
+    topic :: emqx_types:topic(),
+    %% Timestamp (Unit: millisecond)
+    timestamp :: integer(),
+
+    %% Fields the message is matched against:
+    %% Message Payload
+    payload,
+    %% Message headers
+    headers = #{} :: emqx_types:headers(),
+    %% Extra filters
+    %% Reserved for the forward compatibility purposes.
+    filters = #{}
+}).
+
 -endif.

+ 4 - 4
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -99,8 +99,8 @@
 
 -type message() :: emqx_types:message().
 
-%% Message pattern.
--type message(Pattern) :: emqx_types:message(Pattern).
+%% Message matcher.
+-type message_matcher(Payload) :: #message_matcher{payload :: Payload}.
 
 %% A batch of storage operations.
 -type batch() :: [operation()] | #dsbatch{}.
@@ -110,7 +110,7 @@
     message()
     %% Delete a message.
     %% Does nothing if the message does not exist.
-    | {delete, message(_)}.
+    | {delete, message_matcher('_')}.
 
 %% Precondition.
 %% Fails whole batch if the storage already has the matching message (`if_exists'),
@@ -121,7 +121,7 @@
 %% Note: backends may not support this, but if they do only DBs with `atomic_batches'
 %% enabled are expected to support preconditions in batches.
 -type precondition() ::
-    {if_exists | unless_exists, message(iodata() | '_')}.
+    {if_exists | unless_exists, message_matcher(iodata() | '_')}.
 
 -type rank_x() :: term().
 

+ 1 - 1
apps/emqx_utils/include/emqx_message.hrl

@@ -33,7 +33,7 @@
     %% Topic that the message is published to
     topic :: emqx_types:topic(),
     %% Message Payload
-    payload,
+    payload :: emqx_types:payload(),
     %% Timestamp (Unit: millisecond)
     timestamp :: integer(),
     %% Miscellaneous extensions, currently used for OpenTelemetry context propagation