|
|
@@ -40,6 +40,7 @@
|
|
|
|
|
|
-export_type([]).
|
|
|
|
|
|
+-include_lib("emqx_utils/include/emqx_message.hrl").
|
|
|
-include_lib("snabbkaffe/include/trace.hrl").
|
|
|
|
|
|
%%================================================================================
|
|
|
@@ -49,8 +50,16 @@
|
|
|
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
|
|
|
-define(flush, flush).
|
|
|
|
|
|
--record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
|
|
|
--record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}).
|
|
|
+-record(enqueue_req, {
|
|
|
+ message :: emqx_types:message(),
|
|
|
+ sync :: boolean(),
|
|
|
+ payload_bytes :: non_neg_integer()
|
|
|
+}).
|
|
|
+-record(enqueue_atomic_req, {
|
|
|
+ batch :: [emqx_types:message()],
|
|
|
+ sync :: boolean(),
|
|
|
+ payload_bytes :: non_neg_integer()
|
|
|
+}).
|
|
|
|
|
|
%%================================================================================
|
|
|
%% API functions
|
|
|
@@ -73,7 +82,8 @@ store_batch(DB, Messages, Opts) ->
|
|
|
?via(DB, Shard),
|
|
|
#enqueue_req{
|
|
|
message = Message,
|
|
|
- sync = Sync
|
|
|
+ sync = Sync,
|
|
|
+ payload_bytes = payload_size(Message)
|
|
|
},
|
|
|
infinity
|
|
|
)
|
|
|
@@ -83,11 +93,19 @@ store_batch(DB, Messages, Opts) ->
|
|
|
true ->
|
|
|
maps:foreach(
|
|
|
fun(Shard, Batch) ->
|
|
|
+ PayloadBytes = lists:foldl(
|
|
|
+ fun(Msg, Acc) ->
|
|
|
+ Acc + payload_size(Msg)
|
|
|
+ end,
|
|
|
+ 0,
|
|
|
+ Batch
|
|
|
+ ),
|
|
|
gen_server:call(
|
|
|
?via(DB, Shard),
|
|
|
#enqueue_atomic_req{
|
|
|
batch = Batch,
|
|
|
- sync = Sync
|
|
|
+ sync = Sync,
|
|
|
+ payload_bytes = PayloadBytes
|
|
|
},
|
|
|
infinity
|
|
|
)
|
|
|
@@ -108,7 +126,9 @@ store_batch(DB, Messages, Opts) ->
|
|
|
-record(s, {
|
|
|
db :: emqx_ds:db(),
|
|
|
shard :: emqx_ds_replication_layer:shard_id(),
|
|
|
+ metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
|
|
|
n = 0 :: non_neg_integer(),
|
|
|
+ n_bytes = 0 :: non_neg_integer(),
|
|
|
tref :: reference(),
|
|
|
batch = [] :: [emqx_types:message()],
|
|
|
pending_replies = [] :: [gen_server:from()]
|
|
|
@@ -117,18 +137,21 @@ store_batch(DB, Messages, Opts) ->
|
|
|
init([DB, Shard]) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
process_flag(message_queue_data, off_heap),
|
|
|
+ MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard),
|
|
|
+ ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId),
|
|
|
S = #s{
|
|
|
db = DB,
|
|
|
shard = Shard,
|
|
|
+ metrics_id = MetricsId,
|
|
|
tref = start_timer()
|
|
|
},
|
|
|
{ok, S}.
|
|
|
|
|
|
-handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
|
|
|
- do_enqueue(From, Sync, Msg, S);
|
|
|
-handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) ->
|
|
|
+handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, From, S) ->
|
|
|
+ do_enqueue(From, Sync, Msg, NBytes, S);
|
|
|
+handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) ->
|
|
|
Len = length(Batch),
|
|
|
- do_enqueue(From, Sync, {atomic, Len, Batch}, S);
|
|
|
+ do_enqueue(From, Sync, {atomic, Len, NBytes, Batch}, NBytes, S);
|
|
|
handle_call(_Call, _From, S) ->
|
|
|
{reply, {error, unknown_call}, S}.
|
|
|
|
|
|
@@ -161,6 +184,11 @@ do_flush(
|
|
|
) ->
|
|
|
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of
|
|
|
ok ->
|
|
|
+ emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id),
|
|
|
+ emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n),
|
|
|
+ emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes),
|
|
|
+ lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
|
|
+ true = erlang:garbage_collect(),
|
|
|
?tp(
|
|
|
emqx_ds_replication_layer_egress_flush,
|
|
|
#{db => DB, shard => Shard, batch => Messages}
|
|
|
@@ -169,6 +197,7 @@ do_flush(
|
|
|
true = erlang:garbage_collect(),
|
|
|
ok;
|
|
|
Error ->
|
|
|
+ emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id),
|
|
|
true = erlang:garbage_collect(),
|
|
|
?tp(
|
|
|
warning,
|
|
|
@@ -184,19 +213,27 @@ do_flush(
|
|
|
end,
|
|
|
S#s{
|
|
|
n = 0,
|
|
|
+ n_bytes = 0,
|
|
|
batch = [],
|
|
|
pending_replies = [],
|
|
|
tref = start_timer()
|
|
|
}.
|
|
|
|
|
|
-do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
|
|
|
+do_enqueue(
|
|
|
+ From,
|
|
|
+ Sync,
|
|
|
+ MsgOrBatch,
|
|
|
+ BatchBytes,
|
|
|
+ S0 = #s{n = N, n_bytes = NBytes0, batch = Batch, pending_replies = Replies}
|
|
|
+) ->
|
|
|
+ NBytes = NBytes0 + BatchBytes,
|
|
|
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
|
|
|
S1 =
|
|
|
case MsgOrBatch of
|
|
|
{atomic, NumMsgs, Msgs} ->
|
|
|
- S0#s{n = N + NumMsgs, batch = Msgs ++ Batch};
|
|
|
+ S0#s{n = N + NumMsgs, n_bytes = NBytes, batch = Msgs ++ Batch};
|
|
|
Msg ->
|
|
|
- S0#s{n = N + 1, batch = [Msg | Batch]}
|
|
|
+ S0#s{n = N + 1, n_bytes = NBytes, batch = [Msg | Batch]}
|
|
|
end,
|
|
|
%% TODO: later we may want to delay the reply until the message is
|
|
|
%% replicated, but it requies changes to the PUBACK/PUBREC flow to
|
|
|
@@ -228,3 +265,8 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies
|
|
|
start_timer() ->
|
|
|
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
|
|
|
erlang:send_after(Interval, self(), ?flush).
|
|
|
+
|
|
|
+%% @doc Return approximate size of the MQTT message (it doesn't take
|
|
|
+%% all things into account, for example headers and extras)
|
|
|
+payload_size(#message{payload = P, topic = T}) ->
|
|
|
+ size(P) + size(T).
|