|
|
@@ -70,20 +70,28 @@ store_batch(DB, Messages, Opts) ->
|
|
|
lists:foreach(
|
|
|
fun(Message) ->
|
|
|
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
|
|
|
- gen_server:call(?via(DB, Shard), #enqueue_req{
|
|
|
- message = Message,
|
|
|
- sync = Sync
|
|
|
- })
|
|
|
+ gen_server:call(
|
|
|
+ ?via(DB, Shard),
|
|
|
+ #enqueue_req{
|
|
|
+ message = Message,
|
|
|
+ sync = Sync
|
|
|
+ },
|
|
|
+ infinity
|
|
|
+ )
|
|
|
end,
|
|
|
Messages
|
|
|
);
|
|
|
true ->
|
|
|
maps:foreach(
|
|
|
fun(Shard, Batch) ->
|
|
|
- gen_server:call(?via(DB, Shard), #enqueue_atomic_req{
|
|
|
- batch = Batch,
|
|
|
- sync = Sync
|
|
|
- })
|
|
|
+ gen_server:call(
|
|
|
+ ?via(DB, Shard),
|
|
|
+ #enqueue_atomic_req{
|
|
|
+ batch = Batch,
|
|
|
+ sync = Sync
|
|
|
+ },
|
|
|
+ infinity
|
|
|
+ )
|
|
|
end,
|
|
|
maps:groups_from_list(
|
|
|
fun(Message) ->
|