Преглед на файлове

Lazy evaluation when logging messages

Formatting variables and then passing them into the logger functions leads
to performance issues. i.e.

```erlang
logger:debug("RECV ~s", [emqx_packet:format(Packet)])
```

Above message will only be printed when the current log level set to
`debug`, but the function emqx_packet:format/1 will always be evaluated no
matter what the current log level is.

OTP 21 provides a special meta-data named `report_cb`, which can be used
for lazy evaluation. The fun is only evaluated if the primary/handler log level
check passes, and is therefore recommended if it is expensive to generate
the message.
terry-xiaoyu преди 7 години
родител
ревизия
2269967f1a
променени са 4 файла, в които са добавени 92 реда и са изтрити 64 реда
  1. 3 1
      priv/emqx.schema
  2. 24 20
      src/emqx_connection.erl
  3. 42 25
      src/emqx_protocol.erl
  4. 23 18
      src/emqx_ws_connection.erl

+ 3 - 1
priv/emqx.schema

@@ -437,7 +437,9 @@ end}.
                           [{peername,
                               [client_id,"@",peername," "],
                               [client_id, " "]}],
-                          []},
+                          [{peername,
+                              [peername," "],
+                              []}]},
                        msg,"\n"]}},
     FileConf =  fun(Filename) ->
                   #{type => wrap,

+ 24 - 20
src/emqx_connection.erl

@@ -50,9 +50,12 @@
 
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
 
--define(LOG(Level, Format, Args, State),
-        emqx_logger:Level("MQTT(~s): " ++ Format,
-                          [esockd_net:format(State#state.peername) | Args])).
+-define(LOG(Level, Format, Args),
+        emqx_logger:Level(#{header => "[TCP] ", format => Format, args => Args},
+                          #{report_cb =>
+                                fun(#{header := Hdr0, format := Fmt0, args := Args0}) ->
+                                    {Hdr0 ++ Fmt0, Args0}
+                                end})).
 
 start_link(Transport, Socket, Options) ->
     {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Socket, Options]])}.
@@ -153,6 +156,8 @@ init([Transport, RawSocket, Options]) ->
             GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),
             ok = emqx_gc:init(GcPolicy),
             ok = emqx_misc:init_proc_mng_policy(Zone),
+
+            emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}),
             gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
                                   State, self(), IdleTimout);
         {error, Reason} ->
@@ -169,7 +174,6 @@ send_fun(Transport, Socket, Peername) ->
         Data = emqx_frame:serialize(Packet, Options),
         try Transport:async_send(Socket, Data) of
             ok ->
-                ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}),
                 emqx_metrics:inc('bytes/sent', iolist_size(Data)),
                 ok;
             Error -> Error
@@ -195,11 +199,11 @@ handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
     {reply, emqx_protocol:session(ProtoState), State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "unexpected call: ~p", [Req], State),
+    ?LOG(error, "unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "unexpected cast: ~p", [Msg], State),
+    ?LOG(error, "unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
 handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
@@ -225,7 +229,7 @@ handle_info({timeout, Timer, emit_stats},
             ok = emqx_gc:reset(),
             {noreply, NewState, hibernate};
         {shutdown, Reason} ->
-            ?LOG(warning, "shutdown due to ~p", [Reason], NewState),
+            ?LOG(warning, "shutdown due to ~p", [Reason]),
             shutdown(Reason, NewState)
     end;
 handle_info(timeout, State) ->
@@ -235,18 +239,18 @@ handle_info({shutdown, Reason}, State) ->
     shutdown(Reason, State);
 
 handle_info({shutdown, discard, {ClientId, ByPid}}, State) ->
-    ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
+    ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid]),
     shutdown(discard, State);
 
 handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
-    ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
+    ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
     shutdown(conflict, State);
 
 handle_info(activate_sock, State) ->
     {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})};
 
 handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
-    ?LOG(debug, "RECV ~p", [Data], State),
+    ?LOG(debug, "RECV ~p", [Data]),
     Size = iolist_size(Data),
     emqx_metrics:inc('bytes/received', Size),
     Incoming = #{bytes => Size, packets => 0},
@@ -262,7 +266,7 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
     shutdown(Reason, State);
 
 handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) ->
-    ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
+    ?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
     StatFun = fun() ->
                 case Transport:getstat(Socket, [recv_oct]) of
                     {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
@@ -287,14 +291,14 @@ handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
     end;
 
 handle_info(Info, State) ->
-    ?LOG(error, "unexpected info: ~p", [Info], State),
+    ?LOG(error, "unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(Reason, State = #state{transport   = Transport,
-                                 socket      = Socket,
-                                 keepalive   = KeepAlive,
-                                 proto_state = ProtoState}) ->
-    ?LOG(debug, "Terminated for ~p", [Reason], State),
+terminate(Reason, #state{transport   = Transport,
+                         socket      = Socket,
+                         keepalive   = KeepAlive,
+                         proto_state = ProtoState}) ->
+    ?LOG(debug, "Terminated for ~p", [Reason]),
     Transport:fast_close(Socket),
     emqx_keepalive:cancel(KeepAlive),
     case {ProtoState, Reason} of
@@ -330,7 +334,7 @@ handle_packet(Data, State = #state{proto_state  = ProtoState,
                     NewState = State#state{proto_state = ProtoState1},
                     handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState)));
                 {error, Reason} ->
-                    ?LOG(error, "Process packet error - ~p", [Reason], State),
+                    ?LOG(error, "Process packet error - ~p", [Reason]),
                     shutdown(Reason, State);
                 {error, Reason, ProtoState1} ->
                     shutdown(Reason, State#state{proto_state = ProtoState1});
@@ -338,10 +342,10 @@ handle_packet(Data, State = #state{proto_state  = ProtoState,
                     stop(Error, State#state{proto_state = ProtoState1})
             end;
         {error, Error} ->
-            ?LOG(error, "Framing error - ~p", [Error], State),
+            ?LOG(error, "Framing error - ~p", [Error]),
             shutdown(Error, State);
         {'EXIT', Reason} ->
-            ?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Data], State),
+            ?LOG(error, "Parse failed for ~p~nError data:~p", [Reason, Data]),
             shutdown(parse_error, State)
     end.
 

+ 42 - 25
src/emqx_protocol.erl

@@ -72,8 +72,12 @@
 
 -define(NO_PROPS, undefined).
 
--define(LOG(Level, Format, Args, _PState),
-        emqx_logger:Level("[MQTT] " ++ Format, Args)).
+-define(LOG(Level, Format, Args),
+        emqx_logger:Level(#{header => "[MQTT] ", format => Format, args => Args},
+                          #{report_cb =>
+                                fun(#{header := Hdr0, format := Fmt0, args := Args0}) ->
+                                    {Hdr0 ++ Fmt0, Args0}
+                                end})).
 
 %%------------------------------------------------------------------------------
 %% Init
@@ -81,7 +85,6 @@
 
 -spec(init(map(), list()) -> state()).
 init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
-    emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}),
     Zone = proplists:get_value(zone, Options),
     #pstate{zone          =  Zone,
             sendfun       =  SendFun,
@@ -211,7 +214,7 @@ received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
 
 received(Packet = ?PACKET(Type), PState) ->
     PState1 =  set_protover(Packet, PState),
-    trace(recv, Packet, PState1),
+    trace(recv, Packet),
     try emqx_packet:validate(Packet) of
         true ->
             {Packet1, PState2} = preprocess_properties(Packet, PState1),
@@ -319,11 +322,11 @@ process_packet(?CONNECT_PACKET(
                               %% Success
                               {?RC_SUCCESS, SP, PState4};
                           {error, Error} ->
-                              ?LOG(error, "Failed to open session: ~p", [Error], PState1),
+                              ?LOG(error, "Failed to open session: ~p", [Error]),
                               {?RC_UNSPECIFIED_ERROR, PState1}
                       end;
                   {error, Reason} ->
-                      ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState2),
+                      ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason]),
                       {?RC_NOT_AUTHORIZED, PState1}
               end;
           {error, ReasonCode} ->
@@ -335,26 +338,31 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt
         {ok, PState1} ->
             do_publish(Packet, PState1);
         {error, ?RC_TOPIC_ALIAS_INVALID} ->
-            ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID], PState),
+            ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID]),
             {error, ?RC_TOPIC_ALIAS_INVALID, PState};
         {error, ReasonCode} ->
-            ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState),
+            ?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
+                [Topic, emqx_reason_codes:text(ReasonCode)]),
             {error, ReasonCode, PState}
     end;
 
-process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
+process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
     case check_publish(Packet, PState) of
         {ok, PState1} ->
             do_publish(Packet, PState1);
         {error, ReasonCode} ->
+            ?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
+                [Topic, emqx_reason_codes:text(ReasonCode)]),
             deliver({puback, PacketId, ReasonCode}, PState)
     end;
 
-process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
+process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
     case check_publish(Packet, PState) of
         {ok, PState1} ->
             do_publish(Packet, PState1);
         {error, ReasonCode} ->
+            ?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
+                [Topic, emqx_reason_codes:text(ReasonCode)]),
             deliver({pubrec, PacketId, ReasonCode}, PState)
     end;
 
@@ -404,11 +412,14 @@ process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
                     deliver({suback, PacketId, ReasonCodes}, PState)
             end;
         {error, TopicFilters} ->
-            ReasonCodes = lists:map(fun({_, #{rc := ?RC_SUCCESS}}) ->
-                                            ?RC_IMPLEMENTATION_SPECIFIC_ERROR;
-                                       ({_, #{rc := ReasonCode}}) ->
-                                            ReasonCode
-                                    end, TopicFilters),
+            {SubTopics, ReasonCodes} =
+                lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) ->
+                                    {[Topic|Topics], [?RC_IMPLEMENTATION_SPECIFIC_ERROR | Codes]};
+                                ({Topic, #{rc := Code}}, {Topics, Codes}) ->
+                                    {[Topic|Topics], [Code|Codes]}
+                            end, {[], []}, TopicFilters),
+            ?LOG(warning, "Cannot subscribe ~p for ~p",
+                [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
             deliver({suback, PacketId, ReasonCodes}, PState)
     end;
 
@@ -585,7 +596,7 @@ deliver({disconnect, _ReasonCode}, PState) ->
 
 -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
 send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
-    trace(send, Packet, PState),
+    trace(send, Packet),
     case SendFun(Packet, #{version => Ver}) of
         ok ->
             emqx_metrics:sent(Packet),
@@ -759,7 +770,8 @@ check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
 check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) ->
     case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
         allow -> ok;
-        deny  -> {error, ?RC_NOT_AUTHORIZED}
+        deny  ->
+            {error, ?RC_NOT_AUTHORIZED}
     end.
 
 run_check_steps([], _Packet, PState) ->
@@ -793,17 +805,22 @@ check_sub_acl(TopicFilters, PState) ->
               case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
                   allow -> {Ok, [{Topic, SubOpts}|Acc]};
                   deny  ->
-                      emqx_logger:warning([{client, PState#pstate.client_id}],
-                                          "ACL(~s) Cannot SUBSCRIBE ~p for ACL Deny",
-                                          [PState#pstate.client_id, Topic]),
                       {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
               end
       end, {ok, []}, TopicFilters).
 
-trace(recv, Packet, PState) ->
-    ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)], PState);
-trace(send, Packet, PState) ->
-    ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)], PState).
+trace(recv, Packet) ->
+    emqx_logger:debug(#{header => "[MQTT] RECV ~s", pck => Packet},
+                      #{report_cb =>
+                            fun(#{header := Fmt, pck := Pckt}) ->
+                                {Fmt, [emqx_packet:format(Pckt)]}
+                            end});
+trace(send, Packet) ->
+    emqx_logger:debug(#{header => "[MQTT] SEND ~s", pck => Packet},
+                      #{report_cb =>
+                            fun(#{header := Fmt, pck := Pckt}) ->
+                                {Fmt, [emqx_packet:format(Pckt)]}
+                            end}).
 
 inc_stats(recv, Type, PState = #pstate{recv_stats = Stats}) ->
     PState#pstate{recv_stats = inc_stats(Type, Stats)};
@@ -826,7 +843,7 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
     emqx_cm:unregister_connection(ClientId);
 shutdown(Reason, PState = #pstate{connected = true,
                                   client_id = ClientId}) ->
-    ?LOG(info, "Shutdown for ~p", [Reason], PState),
+    ?LOG(info, "Shutdown for ~p", [Reason]),
     emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
     emqx_cm:unregister_connection(ClientId).
 

+ 23 - 18
src/emqx_ws_connection.erl

@@ -45,9 +45,12 @@
 
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
 
--define(WSLOG(Level, Format, Args, _State),
-        emqx_logger:Level("[MQTT/WS] " ++ Format, Args)).
-
+-define(WSLOG(Level, Format, Args),
+        emqx_logger:Level(#{header => "[WS] ", format => Format, args => Args},
+                          #{report_cb =>
+                                fun(#{header := Hdr0, format := Fmt0, args := Args0}) ->
+                                    {Hdr0 ++ Fmt0, Args0}
+                                end})).
 %%------------------------------------------------------------------------------
 %% API
 %%------------------------------------------------------------------------------
@@ -135,6 +138,8 @@ websocket_init(#state{request = Req, options = Options}) ->
     EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
     IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
     lists:foreach(fun(Stat) -> put(Stat, 0) end, ?SOCK_STATS),
+
+    emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}),
     {ok, #state{peername     = Peername,
                 sockname     = Sockname,
                 parser_state = ParserState,
@@ -164,7 +169,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
                                                 proto_state  = ProtoState}) ->
     BinSize = iolist_size(Data),
     put(recv_oct, get(recv_oct) + BinSize),
-    ?WSLOG(debug, "RECV ~p", [Data], State),
+    ?WSLOG(debug, "RECV ~p", [Data]),
     emqx_metrics:inc('bytes/received', BinSize),
     case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
         {more, NewParserState} ->
@@ -176,7 +181,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
                 {ok, ProtoState1} ->
                     websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
                 {error, Error} ->
-                    ?WSLOG(error, "Protocol error - ~p", [Error], State),
+                    ?WSLOG(error, "Protocol error - ~p", [Error]),
                     stop(Error, State);
                 {error, Reason, ProtoState1} ->
                     shutdown(Reason, State#state{proto_state = ProtoState1});
@@ -184,10 +189,10 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
                     stop(Error, State#state{proto_state = ProtoState1})
             end;
         {error, Error} ->
-            ?WSLOG(error, "Frame error: ~p", [Error], State),
+            ?WSLOG(error, "Frame error: ~p", [Error]),
             stop(Error, State);
         {'EXIT', Reason} ->
-            ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State),
+            ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]),
             shutdown(parse_error, State)
     end.
 
@@ -225,12 +230,12 @@ websocket_info({timeout, Timer, emit_stats},
     {ok, State#state{stats_timer = undefined}, hibernate};
 
 websocket_info({keepalive, start, Interval}, State) ->
-    ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
+    ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval]),
     case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
         {ok, KeepAlive} ->
             {ok, State#state{keepalive = KeepAlive}};
         {error, Error} ->
-            ?WSLOG(warning, "Keepalive error - ~p", [Error], State),
+            ?WSLOG(warning, "Keepalive error - ~p", [Error]),
             shutdown(Error, State)
     end;
 
@@ -239,19 +244,19 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
         {ok, KeepAlive1} ->
             {ok, State#state{keepalive = KeepAlive1}};
         {error, timeout} ->
-            ?WSLOG(debug, "Keepalive Timeout!", [], State),
+            ?WSLOG(debug, "Keepalive Timeout!", []),
             shutdown(keepalive_timeout, State);
         {error, Error} ->
-            ?WSLOG(warning, "Keepalive error - ~p", [Error], State),
+            ?WSLOG(warning, "Keepalive error - ~p", [Error]),
             shutdown(keepalive_error, State)
     end;
 
 websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
-    ?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
+    ?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid]),
     shutdown(discard, State);
 
 websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
-    ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
+    ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
     shutdown(conflict, State);
 
 websocket_info({binary, Data}, State) ->
@@ -261,14 +266,14 @@ websocket_info({shutdown, Reason}, State) ->
     shutdown(Reason, State);
 
 websocket_info(Info, State) ->
-    ?WSLOG(error, "unexpected info: ~p", [Info], State),
+    ?WSLOG(error, "unexpected info: ~p", [Info]),
     {ok, State}.
 
-terminate(SockError, _Req, State = #state{keepalive   = Keepalive,
-                                          proto_state = ProtoState,
-                                          shutdown    = Shutdown}) ->
+terminate(SockError, _Req, #state{keepalive   = Keepalive,
+                                  proto_state = ProtoState,
+                                  shutdown    = Shutdown}) ->
     ?WSLOG(debug, "Terminated for ~p, sockerror: ~p",
-           [Shutdown, SockError], State),
+           [Shutdown, SockError]),
     emqx_keepalive:cancel(Keepalive),
     case {ProtoState, Shutdown} of
         {undefined, _} -> ok;