|
|
@@ -159,7 +159,7 @@ cast(Msg) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
|
|
|
publish(From, #mqtt_message{topic=Topic} = Msg) ->
|
|
|
- lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s", [From, Topic]),
|
|
|
+ trace(publish, From, Msg),
|
|
|
%% Retain message first. Don't create retained topic.
|
|
|
case emqttd_msg_store:retain(Msg) of
|
|
|
ok ->
|
|
|
@@ -169,7 +169,7 @@ publish(From, #mqtt_message{topic=Topic} = Msg) ->
|
|
|
publish(From, Topic, Msg)
|
|
|
end.
|
|
|
|
|
|
-publish(_From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
|
|
|
+publish(From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
|
|
|
lists:foreach(
|
|
|
fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) ->
|
|
|
Msg1 = if
|
|
|
@@ -450,3 +450,16 @@ setstats(dropped, false) ->
|
|
|
setstats(dropped, true) ->
|
|
|
emqttd_metrics:inc('messages/dropped').
|
|
|
|
|
|
+
|
|
|
+%%%=============================================================================
|
|
|
+%%% Trace functions
|
|
|
+%%%=============================================================================
|
|
|
+
|
|
|
+trace(publish, From, _Msg) when is_atom(From) ->
|
|
|
+ %%dont' trace broker publish
|
|
|
+ ignore;
|
|
|
+
|
|
|
+trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
|
|
|
+ lager:info([{client, From}, {topic, Topic}],
|
|
|
+ "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
|
|
+
|