|
@@ -83,9 +83,14 @@ dispatch(Pid, Topic) ->
|
|
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
|
|
on_message_publish(Msg = #message{flags = #{retain := true},
|
|
on_message_publish(Msg = #message{flags = #{retain := true},
|
|
|
topic = Topic,
|
|
topic = Topic,
|
|
|
- payload = <<>>}, _Env) ->
|
|
|
|
|
|
|
+ payload = <<>>}, Env) ->
|
|
|
mnesia:dirty_delete(?TAB, topic2tokens(Topic)),
|
|
mnesia:dirty_delete(?TAB, topic2tokens(Topic)),
|
|
|
- {ok, Msg};
|
|
|
|
|
|
|
+ case stop_publish_clear_msg(Env) of
|
|
|
|
|
+ true ->
|
|
|
|
|
+ {ok, emqx_message:set_header(allow_publish, false, Msg)};
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ {ok, Msg}
|
|
|
|
|
+ end;
|
|
|
|
|
|
|
|
on_message_publish(Msg = #message{flags = #{retain := true}}, Env) ->
|
|
on_message_publish(Msg = #message{flags = #{retain := true}}, Env) ->
|
|
|
Msg1 = emqx_message:set_header(retained, true, Msg),
|
|
Msg1 = emqx_message:set_header(retained, true, Msg),
|
|
@@ -224,6 +229,9 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
|
|
"for payload is too big!", [Topic, iolist_size(Payload)])
|
|
"for payload is too big!", [Topic, iolist_size(Payload)])
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
|
|
+stop_publish_clear_msg(Env) ->
|
|
|
|
|
+ proplists:get_bool(stop_publish_clear_msg, Env).
|
|
|
|
|
+
|
|
|
is_table_full(Env) ->
|
|
is_table_full(Env) ->
|
|
|
Limit = proplists:get_value(max_retained_messages, Env, 0),
|
|
Limit = proplists:get_value(max_retained_messages, Env, 0),
|
|
|
Limit > 0 andalso (retained_count() > Limit).
|
|
Limit > 0 andalso (retained_count() > Limit).
|