瀏覽代碼

New logger formatter with meta-data

terry-xiaoyu 7 年之前
父節點
當前提交
94dbdffd59

+ 5 - 8
Makefile

@@ -4,30 +4,27 @@ PROJECT = emqx
 PROJECT_DESCRIPTION = EMQ X Broker
 PROJECT_VERSION = 3.0
 
-DEPS = jsx gproc gen_rpc lager ekka esockd cowboy clique
+DEPS = jsx gproc gen_rpc ekka esockd cowboy clique
 
 dep_jsx     = git https://github.com/talentdeficit/jsx 2.9.0
 dep_gproc   = git https://github.com/uwiger/gproc 0.8.0
-dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0
-dep_lager   = git https://github.com/erlang-lager/lager 3.6.5
+dep_gen_rpc = git https://github.com/emqx/gen_rpc switch_to_logger
 dep_esockd  = git https://github.com/emqx/esockd v5.4.2
-dep_ekka    = git https://github.com/emqx/ekka v0.4.1
+dep_ekka    = git https://github.com/emqx/ekka switch_to_logger
 dep_cowboy  = git https://github.com/ninenines/cowboy 2.4.0
 dep_clique  = git https://github.com/emqx/clique develop
 
 NO_AUTOPATCH = cuttlefish
 
 ERLC_OPTS += +debug_info -DAPPLICATION=emqx
-ERLC_OPTS += +'{parse_transform, lager_transform}'
 
 BUILD_DEPS = cuttlefish
-dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30
+dep_cuttlefish = git https://github.com/emqx/cuttlefish switch_to_logger
 
 #TEST_DEPS = emqx_ct_helplers
 #dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
 
 TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
-TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
 
 EUNIT_OPTS = verbose
 
@@ -46,7 +43,7 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
 
 COVER = true
 
-PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia
+PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia
 DIALYZER_DIRS := ebin/
 DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns
 

+ 8 - 16
etc/emqx.conf

@@ -338,14 +338,14 @@ rpc.socket_keepalive_count = 9
 ## - both: write logs both to file and standard I/O
 log.to = console
 
-## Sets the log severity level.
+## The log severity level.
 ##
 ## Value: debug | info | notice | warning | error | critical | alert | emergency
 ##
 ## Default: error
 log.level = error
 
-## Sets the dir for log files.
+## The dir for log files.
 ##
 ## Value: Folder
 log.dir = {{ platform_log_dir }}
@@ -360,16 +360,16 @@ log.file = emqx.log
 ##
 ## Value: Number
 ## Default: 10M
-## Supported Unit: B | KB | MB | G
-#log.rotation.size = 10M
+## Supported Unit: KB | MB | G
+log.rotation.size = 10MB
 
 ## Maximum rotation count of log files.
 ##
 ## Value: Number
 ## Default: 5
-#log.rotation.count = 5
+log.rotation.count = 5
 
-## To create additional log files for specific levels of logs.
+## To create additional log files for specific log levels.
 ##
 ## Value: File Name
 ## Format: log.$level.file = $filename,
@@ -377,17 +377,9 @@ log.file = emqx.log
 ##                                       error, critical, alert, emergency
 ## Note: Log files for a specific log level will contain all the logs
 ##       that greater than or equal to that level
-log.error.file = error.log
-
-## Enable syslog, to write logs to the rsyslog
-##
-## Values: on | off
-log.syslog = on
-
-## Sets the severity level for syslog.
 ##
-## Value: debug | info | notice | warning | error | critical | alert | emergency
-log.syslog.level = error
+#log.info.file = info.log
+#log.error.file = error.log
 
 ##--------------------------------------------------------------------
 ## Authentication/Access Control

+ 71 - 113
priv/emqx.schema

@@ -382,7 +382,7 @@ end}.
 %% Log
 %%--------------------------------------------------------------------
 
-{mapping, "log.to", "logger.to", [
+{mapping, "log.to", "kernel.logger", [
   {default, console},
   {datatype, {enum, [off, file, console, both]}}
 ]}.
@@ -392,142 +392,100 @@ end}.
   {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
 ]}.
 
-{mapping, "log.dir", "logger.dir", [
+{mapping, "log.logger_sasl_compatible", "kernel.logger_sasl_compatible", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
+{mapping, "log.dir", "kernel.logger", [
   {default, "log"},
   {datatype, string}
 ]}.
 
-{mapping, "log.file", "logger.filename", [
+{mapping, "log.file", "kernel.logger", [
   {default, "emqx.log"},
   {datatype, file}
 ]}.
 
-{mapping, "log.rotation.size", "logger.size", [
-  {default, "10M"},
+{mapping, "log.rotation.size", "kernel.logger", [
+  {default, "10MB"},
   {datatype, bytesize}
 ]}.
 
-{mapping, "log.rotation.count", "logger.count", [
+{mapping, "log.rotation.count", "kernel.logger", [
   {default, 5},
   {datatype, integer}
 ]}.
 
-{mapping, "log.$level.file", "logger.additional_handlers", [
+{mapping, "log.$level.file", "kernel.logger", [
   {datatype, file}
 ]}.
 
-{mapping, "log.syslog", "logger.syslog", [
-  {default,  off},
-  {datatype, flag}
-]}.
-
-{mapping, "log.syslog.identity", "logger.syslog", [
-  {default, "emqx"},
-  {datatype, string}
-]}.
-
-{mapping, "log.syslog.facility", "logger.syslog", [
-  {default, local0},
-  {datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}}
-]}.
-
-{mapping, "log.syslog.level", "logger.syslog", [
-  {default, error},
-  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}}
-]}.
-
-{translation,
- "logger.additional_handlers",
- fun(Conf) ->
-    Format = [time," [",level,"] ",msg,"\n"],
-    AdditionalHandlers = lists:filter(
-                  fun({K, _V}) ->
-                    cuttlefish_variable:is_fuzzy_match(K, string:tokens("log.$level.file", "."))
-                  end,
-               Conf),
-      Users = [{handler, file, logger_disk_log_h,
-                  #{level => Level,
-                    config => #{file => Dir ++ Filename,
-                                type => wrap,
-                                max_no_files => MaxNoFiles,
-                                max_no_bytes => MaxNoBytes},
-                    formatter =>
-                        {logger_formatter, #{legacy_header => false,
-                                             single_line => true,
-                                             template => Format}},
-                    filesync_repeat_interval => 1000}}
-                || {[_, Level, _], Filename} <- UserList],
-      case Users of
-          [] ->
-              throw(unset);
-          _ -> Users
-      end
-
-    ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf, undefined) of
-      undefined -> [];
-      ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
-                                              {level, error},
-                                              {size, cuttlefish:conf_get("log.error.size", Conf)},
-                                              {date, "$D0"},
-                                              {count, cuttlefish:conf_get("log.error.count", Conf)}]}]
-    end,
-
-    InfoHandler = case cuttlefish:conf_get("log.info.file", Conf, undefined) of
-      undefined -> [];
-      InfoFilename -> [{lager_file_backend, [{file, InfoFilename},
-                                             {level, info},
-                                             {size, cuttlefish:conf_get("log.info.size", Conf)},
-                                             {date, "$D0"},
-                                             {count, cuttlefish:conf_get("log.info.count", Conf)}]}]
-    end,
-
-    ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
-    ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
-
-    ConsoleHandler = {lager_console_backend, [{level, ConsoleLogLevel}]},
-    ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
-                                               {level, ConsoleLogLevel},
-                                               {size, cuttlefish:conf_get("log.console.size", Conf)},
-                                               {date, "$D0"},
-                                               {count, cuttlefish:conf_get("log.console.count", Conf)}]},
-
-    ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
-      off -> [];
-      file -> [ConsoleFileHandler];
-      console -> [ConsoleHandler];
-      both -> [ConsoleHandler, ConsoleFileHandler];
-      _ -> []
-    end,
-    ConsoleHandlers ++ ErrorHandler ++ InfoHandler
-  end
-}.
-
-{mapping, "log.crash", "lager.crash_log", [
-  {default, on},
-  {datatype, flag}
-]}.
-
-{mapping, "log.crash.file", "lager.crash_log", [
-  {default, "log/crash.log"},
-  {datatype, file}
-]}.
-
-{translation,
- "lager.crash_log",
- fun(Conf) ->
-     case cuttlefish:conf_get("log.crash", Conf) of
-         false -> undefined;
-         _ ->
-             cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
-     end
- end}.
-
 {mapping, "sasl", "sasl.sasl_error_logger", [
   {default, off},
   {datatype, flag},
   hidden
 ]}.
 
+{translation, "kernel.logger", fun(Conf) ->
+    LogTo = cuttlefish:conf_get("log.to", Conf),
+    Formatter = {emqx_logger_formatter,
+                  #{template =>
+                      [time," [",level,"] ",
+                       {client_id,
+                          [{peername,
+                              [client_id,"@",peername," "],
+                              [client_id, " "]}],
+                          []},
+                       msg,"\n"]}},
+    FileConf =  fun(Filename) ->
+                  #{type => wrap,
+                    file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),
+                    max_no_files => cuttlefish:conf_get("log.rotation.count", Conf),
+                    max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)}
+                end,
+
+    %% For the default logger that outputs to console
+    DefaultHandler =
+        if LogTo =:= console orelse LogTo =:= both ->
+                [{handler, default, logger_std_h,
+                    #{level => all,
+                      config => #{type => standard_io},
+                      formatter => Formatter}}];
+           true ->
+                [{handler, default, undefined}]
+        end,
+
+    %% For the file logger
+    FileHandler =
+        if LogTo =:= file orelse LogTo =:= both ->
+              [{handler, file, logger_disk_log_h,
+                    #{level => all,
+                      config => FileConf(cuttlefish:conf_get("log.file", Conf)),
+                      formatter => Formatter,
+                      filesync_repeat_interval => 1000}}];
+           true -> []
+        end,
+
+    %% For creating additional log files for specific log levels.
+    AdditionalLogFiles =
+        if LogTo =:= file orelse LogTo =:= both ->
+              lists:filter(fun({K, V}) ->
+                  cuttlefish_variable:is_fuzzy_match(K, string:tokens("log.$level.file", "."))
+                end, Conf);
+           true -> []
+        end,
+    AdditionalHandlers =
+        [{handler, list_to_atom("file_for_"++Level), logger_disk_log_h,
+            #{level => list_to_atom(Level),
+              config => FileConf(Filename),
+              formatter => Formatter,
+              filesync_repeat_interval => 1000}}
+          || {[_, Level, _], Filename} <- AdditionalLogFiles],
+
+    _AllHandlers = DefaultHandler ++ FileHandler ++ AdditionalHandlers
+end}.
+
 %%--------------------------------------------------------------------
 %% Authentication/ACL
 %%--------------------------------------------------------------------

+ 1 - 4
rebar.config

@@ -1,8 +1,6 @@
 {deps, [{jsx, "2.9.0"},
         {gproc, "0.8.0"},
-        {lager, "3.6.5"},
-        {cowboy, "2.4.0"},
-        {lager_syslog, {git, "https://github.com/basho/lager_syslog", {branch, "3.0.1"}}}
+        {cowboy, "2.4.0"}
        ]}.
 
 %% appended to deps in rebar.config.script
@@ -20,7 +18,6 @@
             warn_unused_import,
             warn_obsolete_guard,
             debug_info,
-            {parse_transform, lager_transform},
             {d, 'APPLICATION', emqx}]}.
 {xref_checks, [undefined_function_calls, undefined_functions,
                locals_not_used, deprecated_function_calls,

+ 2 - 2
src/emqx.app.src

@@ -3,8 +3,8 @@
               {vsn,"3.0"},
               {modules,[]},
               {registered,[emqx_sup]},
-              {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,
-                             cowboy,lager_syslog]},
+              {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,
+                             cowboy]},
               {env,[]},
               {mod,{emqx_app,[]}},
               {maintainers,["Feng Lee <feng@emqx.io>"]},

+ 2 - 2
src/emqx_app.erl

@@ -47,12 +47,12 @@ stop(_State) ->
 %%--------------------------------------------------------------------
 
 print_banner() ->
-    io:format("Starting ~s on node ~s~n", [?APP, node()]).
+    logger:info("Starting ~s on node ~s", [?APP, node()]).
 
 print_vsn() ->
     {ok, Descr} = application:get_key(description),
     {ok, Vsn} = application:get_key(vsn),
-    io:format("~s ~s is running now!~n", [Descr, Vsn]).
+    logger:info("~s ~s is running now!", [Descr, Vsn]).
 
 %%--------------------------------------------------------------------
 %% Autocluster

+ 2 - 2
src/emqx_bridge.erl

@@ -244,7 +244,7 @@ handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) ->
 
 handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid,
                                                   reconnect_interval = ReconnectInterval}) ->
-    lager:warning("emqx bridge stop reason:~p", [Reason]),
+    logger:warning("emqx bridge stop reason:~p", [Reason]),
     erlang:send_after(ReconnectInterval, self(), start),
     {noreply, State#state{client_pid = undefined}};
 
@@ -306,7 +306,7 @@ format_mountpoint(Prefix) ->
 store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg ->
     [Data | Queue];
 store(memory, _Data, Queue, _MaxPendingMsg) ->
-    lager:error("Beyond max pending messages"),
+    logger:error("Beyond max pending messages"),
     Queue;
 store(disk, Data, Queue, _MaxPendingMsg)->
     [Data | Queue].

+ 1 - 1
src/emqx_broker.erl

@@ -337,7 +337,7 @@ handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, sub
         true ->
             case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of
                 true ->
-                    io:format("Ets: ~p,  SubOpts: ~p", [ets:lookup_element(?SUBOPTION, Topic, Subscriber), SubOpts]),
+                    logger:info("Ets: ~p,  SubOpts: ~p", [ets:lookup_element(?SUBOPTION, Topic, Subscriber), SubOpts]),
                     gen_server:reply(From, ok),
                     {noreply, State};
                 false ->

+ 2 - 2
src/emqx_ctl.erl

@@ -87,8 +87,8 @@ lookup_command(Cmd) when is_atom(Cmd) ->
     end.
 
 usage() ->
-    io:format("Usage: ~s~n", [?MODULE]),
-    [begin io:format("~80..-s~n", [""]), Mod:Cmd(usage) end
+    logger:info("Usage: ~s", [?MODULE]),
+    [begin logger:info("~80..-s", [""]), Mod:Cmd(usage) end
      || {_, {Mod, Cmd}, _} <- ets:tab2list(?TAB)].
 
 %%------------------------------------------------------------------------------

+ 4 - 4
src/emqx_listeners.erl

@@ -33,9 +33,9 @@ start() ->
 start_listener({Proto, ListenOn, Options}) ->
     case start_listener(Proto, ListenOn, Options) of
         {ok, _} ->
-            io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
+            logger:info("Start mqtt:~s listener on ~s successfully.", [Proto, format(ListenOn)]);
         {error, Reason} ->
-            io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p!",
+            io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p~n!",
                       [Proto, format(ListenOn), Reason])
     end.
 
@@ -117,9 +117,9 @@ stop() ->
 stop_listener({Proto, ListenOn, Opts}) ->
     case stop_listener(Proto, ListenOn, Opts) of
         ok ->
-            io:format("Stop mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
+            logger:info("Stop mqtt:~s listener on ~s successfully.", [Proto, format(ListenOn)]);
         {error, Reason} ->
-            io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p.",
+            io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.",
                       [Proto, format(ListenOn), Reason])
     end.
 

+ 29 - 20
src/emqx_logger.erl

@@ -22,38 +22,47 @@
 -export([error/1, error/2, error/3]).
 -export([critical/1, critical/2, critical/3]).
 
+-export([add_proc_metadata/1]).
+
 debug(Msg) ->
-    lager:debug(Msg).
+    logger:debug(Msg).
 debug(Format, Args) ->
-    lager:debug(Format, Args).
-debug(Metadata, Format, Args) when is_list(Metadata) ->
-    lager:debug(Metadata, Format, Args).
+    logger:debug(Format, Args).
+debug(Metadata, Format, Args) when is_map(Metadata) ->
+    logger:debug(Format, Args, Metadata).
 
 info(Msg) ->
-    lager:info(Msg).
+    logger:info(Msg).
 info(Format, Args) ->
-    lager:info(Format, Args).
-info(Metadata, Format, Args) when is_list(Metadata) ->
-    lager:info(Metadata, Format, Args).
+    logger:info(Format, Args).
+info(Metadata, Format, Args) when is_map(Metadata) ->
+    logger:info(Format, Args, Metadata).
 
 warning(Msg) ->
-    lager:warning(Msg).
+    logger:warning(Msg).
 warning(Format, Args) ->
-    lager:warning(Format, Args).
-warning(Metadata, Format, Args) when is_list(Metadata) ->
-    lager:warning(Metadata, Format, Args).
+    logger:warning(Format, Args).
+warning(Metadata, Format, Args) when is_map(Metadata) ->
+    logger:warning(Format, Args, Metadata).
 
 error(Msg) ->
-    lager:error(Msg).
+    logger:error(Msg).
 error(Format, Args) ->
-    lager:error(Format, Args).
-error(Metadata, Format, Args) when is_list(Metadata) ->
-    lager:error(Metadata, Format, Args).
+    logger:error(Format, Args).
+error(Metadata, Format, Args) when is_map(Metadata) ->
+    logger:error(Format, Args, Metadata).
 
 critical(Msg) ->
-    lager:critical(Msg).
+    logger:critical(Msg).
 critical(Format, Args) ->
-    lager:critical(Format, Args).
-critical(Metadata, Format, Args) when is_list(Metadata) ->
-    lager:critical(Metadata, Format, Args).
+    logger:critical(Format, Args).
+critical(Metadata, Format, Args) when is_map(Metadata) ->
+    logger:critical(Format, Args, Metadata).
 
+add_proc_metadata(Meta) ->
+    case logger:get_process_metadata() of
+        undefined ->
+            logger:set_process_metadata(Meta);
+        OldMeta ->
+            logger:set_process_metadata(maps:merge(OldMeta, Meta))
+    end.

+ 360 - 0
src/emqx_logger_formatter.erl

@@ -0,0 +1,360 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%% This file is copied from lib/kernel/src/logger_formatter.erl, and
+%% modified for a more concise time format other than the default RFC3339.
+
+-module(emqx_logger_formatter).
+
+-export([format/2]).
+-export([check_config/1]).
+
+-define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]).
+
+-define(FormatP, "~0tp").
+
+-define(IS_STRING(String),
+        (is_list(String) orelse is_binary(String))).
+
+%%%-----------------------------------------------------------------
+%%% Types
+-type config() :: #{chars_limit     => pos_integer() | unlimited,
+                    depth           => pos_integer() | unlimited,
+                    max_size        => pos_integer() | unlimited,
+                    report_cb       => logger:report_cb(),
+                    quit        => template()}.
+-type template() :: [metakey() | {metakey(),template(),template()} | string()].
+-type metakey() :: atom() | [atom()].
+
+%%%-----------------------------------------------------------------
+%%% API
+-spec format(LogEvent,Config) -> unicode:chardata() when
+      LogEvent :: logger:log_event(),
+      Config :: config().
+format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0)
+  when is_map(Config0) ->
+    Config = add_default_config(Config0),
+    Template = maps:get(template,Config),
+    {BT,AT0} = lists:splitwith(fun(msg) -> false; (_) -> true end, Template),
+    {DoMsg,AT} =
+        case AT0 of
+            [msg|Rest] -> {true,Rest};
+            _ ->{false,AT0}
+        end,
+    B = do_format(Level,Meta,BT,Config),
+    A = do_format(Level,Meta,AT,Config),
+    MsgStr =
+        if DoMsg ->
+                Config1 =
+                    case maps:get(chars_limit,Config) of
+                        unlimited ->
+                            Config;
+                        Size0 ->
+                            Size =
+                                case Size0 - string:length([B,A]) of
+                                    S when S>=0 -> S;
+                                    _ -> 0
+                                end,
+                            Config#{chars_limit=>Size}
+                    end,
+                string:trim(format_msg(Msg0,Meta,Config1));
+           true ->
+                ""
+        end,
+    truncate([B,MsgStr,A],maps:get(max_size,Config)).
+
+do_format(Level,Data,[level|Format],Config) ->
+    [to_string(level,Level,Config)|do_format(Level,Data,Format,Config)];
+do_format(Level,Data,[{Key,IfExist,Else}|Format],Config) ->
+    String =
+        case value(Key,Data) of
+            {ok,Value} -> do_format(Level,Data#{Key=>Value},IfExist,Config);
+            error -> do_format(Level,Data,Else,Config)
+        end,
+    [String|do_format(Level,Data,Format,Config)];
+do_format(Level,Data,[Key|Format],Config)
+  when is_atom(Key) orelse
+       (is_list(Key) andalso is_atom(hd(Key))) ->
+    String =
+        case value(Key,Data) of
+            {ok,Value} -> to_string(Key,Value,Config);
+            error -> ""
+        end,
+    [String|do_format(Level,Data,Format,Config)];
+do_format(Level,Data,[Str|Format],Config) ->
+    [Str|do_format(Level,Data,Format,Config)];
+do_format(_Level,_Data,[],_Config) ->
+    [].
+
+value(Key,Meta) when is_map_key(Key,Meta) ->
+    {ok,maps:get(Key,Meta)};
+value([Key|Keys],Meta) when is_map_key(Key,Meta) ->
+    value(Keys,maps:get(Key,Meta));
+value([],Value) ->
+    {ok,Value};
+value(_,_) ->
+    error.
+
+to_string(time,Time,Config) ->
+    format_time(Time,Config);
+to_string(mfa,MFA,Config) ->
+    format_mfa(MFA,Config);
+to_string(_,Value,Config) ->
+    to_string(Value,Config).
+
+to_string(X,_) when is_atom(X) ->
+    atom_to_list(X);
+to_string(X,_) when is_integer(X) ->
+    integer_to_list(X);
+to_string(X,_) when is_pid(X) ->
+    pid_to_list(X);
+to_string(X,_) when is_reference(X) ->
+    ref_to_list(X);
+to_string(X,_) when is_list(X) ->
+    case printable_list(lists:flatten(X)) of
+        true -> X;
+        _ -> io_lib:format(?FormatP,[X])
+    end;
+to_string(X,_) ->
+    io_lib:format(?FormatP,[X]).
+
+printable_list([]) ->
+    false;
+printable_list(X) ->
+    io_lib:printable_list(X).
+
+format_msg({string,Chardata},Meta,Config) ->
+    format_msg({"~ts",[Chardata]},Meta,Config);
+format_msg({report,_}=Msg,Meta,#{report_cb:=Fun}=Config)
+  when is_function(Fun,1); is_function(Fun,2) ->
+    format_msg(Msg,Meta#{report_cb=>Fun},maps:remove(report_cb,Config));
+format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,1) ->
+    try Fun(Report) of
+        {Format,Args} when is_list(Format), is_list(Args) ->
+            format_msg({Format,Args},maps:remove(report_cb,Meta),Config);
+        Other ->
+            format_msg({"REPORT_CB/1 ERROR: ~0tp; Returned: ~0tp",
+                        [Report,Other]},Meta,Config)
+    catch C:R:S ->
+            format_msg({"REPORT_CB/1 CRASH: ~0tp; Reason: ~0tp",
+                        [Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},Meta,Config)
+    end;
+format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,2) ->
+    try Fun(Report,maps:with([depth,chars_limit,single_line],Config)) of
+        Chardata when ?IS_STRING(Chardata) ->
+            try chardata_to_list(Chardata) % already size limited by report_cb
+            catch _:_ ->
+                    format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp",
+                                [Report,Chardata]},Meta,Config)
+            end;
+        Other ->
+            format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp",
+                        [Report,Other]},Meta,Config)
+    catch C:R:S ->
+            format_msg({"REPORT_CB/2 CRASH: ~0tp; Reason: ~0tp",
+                        [Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},
+                       Meta,Config)
+    end;
+format_msg({report,Report},Meta,Config) ->
+    format_msg({report,Report},
+               Meta#{report_cb=>fun logger:format_report/1},
+               Config);
+format_msg(Msg,_Meta,#{depth:=Depth,chars_limit:=CharsLimit}) ->
+    Opts = chars_limit_to_opts(CharsLimit),
+    do_format_msg(Msg, Depth, Opts).
+
+chars_limit_to_opts(unlimited) -> [];
+chars_limit_to_opts(CharsLimit) -> [{chars_limit,CharsLimit}].
+
+do_format_msg({Format0,Args},Depth,Opts) ->
+    try
+        Format1 = io_lib:scan_format(Format0, Args),
+        Format = reformat(Format1, Depth),
+        io_lib:build_text(Format,Opts)
+    catch C:R:S ->
+            FormatError = "FORMAT ERROR: ~0tp - ~0tp",
+            case Format0 of
+                FormatError ->
+                    %% already been here - avoid failing cyclically
+                    erlang:raise(C,R,S);
+                _ ->
+                    format_msg({FormatError,[Format0,Args]},Depth,Opts)
+            end
+    end.
+
+reformat(Format,unlimited) ->
+    Format;
+reformat([#{control_char:=C}=M|T], Depth) when C =:= $p ->
+    [limit_depth(M#{width => 0}, Depth)|reformat(T, Depth)];
+reformat([#{control_char:=C}=M|T], Depth) when C =:= $P ->
+    [M#{width => 0}|reformat(T, Depth)];
+reformat([#{control_char:=C}=M|T], Depth) when C =:= $p; C =:= $w ->
+    [limit_depth(M, Depth)|reformat(T, Depth)];
+reformat([H|T], Depth) ->
+    [H|reformat(T, Depth)];
+reformat([], _) ->
+    [].
+
+limit_depth(M0, unlimited) ->
+    M0;
+limit_depth(#{control_char:=C0, args:=Args}=M0, Depth) ->
+    C = C0 - ($a - $A),				%To uppercase.
+    M0#{control_char:=C,args:=Args++[Depth]}.
+
+chardata_to_list(Chardata) ->
+    case unicode:characters_to_list(Chardata,unicode) of
+        List when is_list(List) ->
+            List;
+        Error ->
+            throw(Error)
+    end.
+
+truncate(String,unlimited) ->
+    String;
+truncate(String,Size) ->
+    Length = string:length(String),
+    if Length>Size ->
+            case lists:reverse(lists:flatten(String)) of
+                [$\n|_] ->
+                    string:slice(String,0,Size-4)++"...\n";
+                _ ->
+                    string:slice(String,0,Size-3)++"..."
+            end;
+       true ->
+            String
+    end.
+
+%% Convert microseconds-timestamp into local datatime string in milliseconds
+format_time(SysTime,#{})
+  when is_integer(SysTime) ->
+    Ms = SysTime rem 1000000 div 1000,
+    {Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond),
+    format_time({Date, {H, Mi, S, Ms}}).
+format_time({{Y, M, D}, {H, Mi, S, Ms}}) ->
+    io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]);
+format_time({{Y, M, D}, {H, Mi, S}}) ->
+    io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]).
+
+format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) ->
+    atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A);
+format_mfa({M,F,A},Config) when is_atom(M), is_atom(F), is_list(A) ->
+    format_mfa({M,F,length(A)},Config);
+format_mfa(MFA,Config) ->
+    to_string(MFA,Config).
+
+%% Ensure that all valid configuration parameters exist in the final
+%% configuration map
+add_default_config(Config0) ->
+    Default =
+        #{chars_limit=>unlimited,
+          error_logger_notice_header=>info},
+    MaxSize = get_max_size(maps:get(max_size,Config0,undefined)),
+    Depth = get_depth(maps:get(depth,Config0,undefined)),
+    add_default_template(maps:merge(Default,Config0#{max_size=>MaxSize,
+                                                     depth=>Depth})).
+
+add_default_template(#{template:=_}=Config) ->
+    Config;
+add_default_template(Config) ->
+    Config#{template=>?DEFAULT_FORMAT_TEMPLATE_SINGLE}.
+
+get_max_size(undefined) ->
+    unlimited;
+get_max_size(S) ->
+    max(10,S).
+
+get_depth(undefined) ->
+    error_logger:get_format_depth();
+get_depth(S) ->
+    max(5,S).
+
+-spec check_config(Config) -> ok | {error,term()} when
+      Config :: config().
+check_config(Config) when is_map(Config) ->
+    do_check_config(maps:to_list(Config));
+check_config(Config) ->
+    {error,{invalid_formatter_config,?MODULE,Config}}.
+
+do_check_config([{Type,L}|Config]) when Type == chars_limit;
+                                        Type == depth;
+                                        Type == max_size ->
+    case check_limit(L) of
+        ok -> do_check_config(Config);
+        error -> {error,{invalid_formatter_config,?MODULE,{Type,L}}}
+    end;
+do_check_config([{error_logger_notice_header,ELNH}|Config]) when ELNH == info;
+                                                                 ELNH == notice ->
+    do_check_config(Config);
+do_check_config([{report_cb,RCB}|Config]) when is_function(RCB,1);
+                                               is_function(RCB,2) ->
+    do_check_config(Config);
+do_check_config([{template,T}|Config]) ->
+    case check_template(T) of
+        ok -> do_check_config(Config);
+        error -> {error,{invalid_formatter_template,?MODULE,T}}
+    end;
+
+do_check_config([C|_]) ->
+    {error,{invalid_formatter_config,?MODULE,C}};
+do_check_config([]) ->
+    ok.
+
+check_limit(L) when is_integer(L), L>0 ->
+    ok;
+check_limit(unlimited) ->
+    ok;
+check_limit(_) ->
+    error.
+
+check_template([Key|T]) when is_atom(Key) ->
+    check_template(T);
+check_template([Key|T]) when is_list(Key), is_atom(hd(Key)) ->
+    case lists:all(fun(X) when is_atom(X) -> true;
+                      (_) -> false
+                   end,
+                   Key) of
+        true ->
+            check_template(T);
+        false ->
+            error
+    end;
+check_template([{Key,IfExist,Else}|T])
+  when is_atom(Key) orelse
+       (is_list(Key) andalso is_atom(hd(Key))) ->
+    case check_template(IfExist) of
+        ok ->
+            case check_template(Else) of
+                ok ->
+                    check_template(T);
+                error ->
+                    error
+            end;
+        error ->
+            error
+    end;
+check_template([Str|T]) when is_list(Str) ->
+    case io_lib:printable_unicode_list(Str) of
+        true -> check_template(T);
+        false -> error
+    end;
+check_template([]) ->
+    ok;
+check_template(_) ->
+    error.

+ 1 - 1
src/emqx_modules.erl

@@ -21,7 +21,7 @@ load() ->
     lists:foreach(
       fun({Mod, Env}) ->
         ok = Mod:load(Env),
-        io:format("Load ~s module successfully.~n", [Mod])
+        logger:info("Load ~s module successfully.", [Mod])
       end, emqx_config:get_env(modules, [])).
 
 -spec(unload() -> ok).

+ 4 - 4
src/emqx_protocol.erl

@@ -72,9 +72,8 @@
 
 -define(NO_PROPS, undefined).
 
--define(LOG(Level, Format, Args, PState),
-        emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
-                          [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
+-define(LOG(Level, Format, Args, _PState),
+        emqx_logger:Level("[MQTT] " ++ Format, Args)).
 
 %%------------------------------------------------------------------------------
 %% Init
@@ -82,6 +81,7 @@
 
 -spec(init(map(), list()) -> state()).
 init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
+    emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}),
     Zone = proplists:get_value(zone, Options),
     #pstate{zone          =  Zone,
             sendfun       =  SendFun,
@@ -287,7 +287,7 @@ process_packet(?CONNECT_PACKET(
                                        client_id   = ClientId,
                                        username    = Username,
                                        password    = Password} = Connect), PState) ->
-
+    emqx_logger:add_proc_metadata(#{client_id => binary_to_list(ClientId)}),
     %% TODO: Mountpoint...
     %% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
     WillMsg = make_will_msg(Connect),

+ 3 - 3
src/emqx_session.erl

@@ -161,9 +161,8 @@
 
 -define(TIMEOUT, 60000).
 
--define(LOG(Level, Format, Args, State),
-        emqx_logger:Level([{client, State#state.client_id}],
-                          "Session(~s): " ++ Format, [State#state.client_id | Args])).
+-define(LOG(Level, Format, Args, _State),
+        emqx_logger:Level("[Session] " ++ Format, Args)).
 
 %% @doc Start a session proc.
 -spec(start_link(SessAttrs :: map()) -> {ok, pid()}).
@@ -341,6 +340,7 @@ init([Parent, #{zone                := Zone,
                 max_inflight        := MaxInflight,
                 topic_alias_maximum := TopicAliasMaximum,
                 will_msg            := WillMsg}]) ->
+    emqx_logger:add_proc_metadata(#{client_id => ClientId}),
     process_flag(trap_exit, true),
     true = link(ConnPid),
     IdleTimout = get_env(Zone, idle_timeout, 30000),

+ 3 - 3
src/emqx_sys_mon.erl

@@ -25,9 +25,9 @@
 
 -define(SYSMON, ?MODULE).
 -define(LOG(Msg, ProcInfo),
-        emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
+        emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
 -define(LOG(Msg, ProcInfo, PortInfo),
-        emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
+        emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
 
 %% @doc Start system monitor
 -spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}).
@@ -130,7 +130,7 @@ handle_info({timeout, _Ref, reset}, State) ->
     {noreply, State#state{events = []}, hibernate};
 
 handle_info(Info, State) ->
-    lager:error("[SYSMON] unexpected Info: ~p", [Info]),
+    logger:error("[SYSMON] unexpected Info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, #state{timer = TRef}) ->

+ 3 - 3
src/emqx_tracer.erl

@@ -41,7 +41,7 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
     ignore;
 trace(publish, #message{from = From, topic = Topic, payload = Payload})
     when is_binary(From); is_atom(From) ->
-    emqx_logger:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
+    emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
 
 %%------------------------------------------------------------------------------
 %% Start/Stop trace
@@ -76,7 +76,7 @@ init([]) ->
     {ok, #state{level = emqx_config:get_env(trace_level, debug), traces = #{}}}.
 
 handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
-    case catch lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of
+    case catch logger:trace_file(LogFile, [Who], Level, ?OPTIONS) of
         {ok, exists} ->
             {reply, {error, already_exists}, State};
         {ok, Trace} ->
@@ -92,7 +92,7 @@ handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, tr
 handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
     case maps:find(Who, Traces) of
         {ok, {Trace, _LogFile}} ->
-            case lager:stop_trace(Trace) of
+            case logger:stop_trace(Trace) of
                 ok -> ok;
                 {error, Error} ->
                     emqx_logger:error("[Tracer] stop trace ~p error: ~p", [Who, Error])

+ 2 - 3
src/emqx_ws_connection.erl

@@ -45,9 +45,8 @@
 
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
 
--define(WSLOG(Level, Format, Args, State),
-        emqx_logger:Level("MQTT/WS(~s): " ++ Format,
-                          [esockd_net:format(State#state.peername) | Args])).
+-define(WSLOG(Level, Format, Args, _State),
+        emqx_logger:Level("[MQTT/WS] " ++ Format, Args)).
 
 %%------------------------------------------------------------------------------
 %% API

+ 0 - 15
test/emqx_SUITE_data/slave.config

@@ -33,21 +33,6 @@
            {large_heap,8388608},
            {busy_port,false},
            {busy_dist_port,true}]}]},
- {sasl,[{sasl_error_logger,false}]},
- {lager,
-     [{error_logger_hwm,1000},
-      {error_logger_redirect,true},
-      {log_dir,"{{ platform_log_dir }}"},
-      {handlers,
-          [{lager_console_backend,error},
-           {lager_file_backend,
-               [{file,"{{ platform_log_dir }}/error.log"},
-                {level,error},
-                {size,10485760},
-                {date,"$D0"},
-                {count,5}]},
-           {lager_syslog_backend,["emq",local0,error]}]},
-      {crash_log,"{{ platform_log_dir }}/crash.log"}]},
  {gen_rpc,
      [{socket_keepalive_count,2},
       {socket_keepalive_interval,5},

+ 2 - 2
test/ws_client.erl

@@ -52,10 +52,10 @@ init(_, _WSReq) ->
     {ok, #state{}}.
 
 websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) ->
-    lager:info("Client received frame~p", [Frame]),
+    logger:info("Client received frame~p", [Frame]),
     {ok, State#state{buffer = [Frame|Buffer]}};
 websocket_handle(Frame, _, State = #state{waiting = From}) ->
-    lager:info("Client received frame~p", [Frame]),
+    logger:info("Client received frame~p", [Frame]),
     From ! Frame,
     {ok, State#state{waiting = undefined}}.