Ver código fonte

Merge pull request #5836 from Spycsh/slog

chore: change to structured logging under apps/emqx/src
Zaiming (Stone) Shi 4 anos atrás
pai
commit
5f3b146f15

+ 27 - 8
apps/emqx/src/emqx_alarm.erl

@@ -239,11 +239,17 @@ handle_call({get_alarms, deactivated}, _From, State) ->
     {reply, Alarms, State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{
+        msg => "unexpected_call",
+        call => Req
+    }),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected msg: ~p", [Msg]),
+    ?SLOG(error, #{
+        msg => "unexpected_msg",
+        payload => Msg
+    }),
     {noreply, State}.
 
 handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
@@ -253,11 +259,14 @@ handle_info({timeout, _TRef, delete_expired_deactivated_alarm},
     {noreply, State#state{timer = ensure_timer(TRef, Period)}};
 
 handle_info({update_timer, Period}, #state{timer = TRef} = State) ->
-    ?LOG(warning, "update the 'validity_period' timer to ~p", [Period]),
+    ?SLOG(warning, #{
+        msg => "update_the_validity_period_timer",
+        period => Period
+    }),
     {noreply, State#state{timer = ensure_timer(TRef, Period)}};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->
@@ -323,8 +332,11 @@ deactivate_all_alarms() ->
 clear_table(TableName) ->
     case ekka_mnesia:clear_table(TableName) of
         {aborted, Reason} ->
-            ?LOG(warning, "Faile to clear table ~p reason: ~p",
-                 [TableName, Reason]);
+            ?SLOG(warning, #{
+                msg => "fail_to_clear_table",
+                table_name => TableName,
+                reason => Reason
+            });
         {atomic, ok} ->
             ok
     end.
@@ -354,10 +366,17 @@ delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) ->
 do_actions(_, _, []) ->
     ok;
 do_actions(activate, Alarm = #activated_alarm{name = Name, message = Message}, [log | More]) ->
-    ?LOG(warning, "Alarm ~s is activated, ~s", [Name, Message]),
+    ?SLOG(warning, #{
+        msg => "alarm_is_activated",
+        name => Name,
+        message => Message
+    }),
     do_actions(activate, Alarm, More);
 do_actions(deactivate, Alarm = #deactivated_alarm{name = Name}, [log | More]) ->
-    ?LOG(warning, "Alarm ~s is deactivated", [Name]),
+    ?SLOG(warning, #{
+        msg => "alarm_is_deactivated",
+        name => Name
+    }),
     do_actions(deactivate, Alarm, More);
 do_actions(Operation, Alarm, [publish | More]) ->
     Topic = topic(Operation),

+ 8 - 6
apps/emqx/src/emqx_authentication.erl

@@ -349,9 +349,11 @@ initialize_authentication(ChainName, AuthenticatorsConfig) ->
             {ok, _} ->
                 ok;
             {error, Reason} ->
-                ?SLOG(error, #{msg => "failed to create authenticator",
-                               reason => Reason,
-                               authenticator => generate_id(AuthenticatorConfig)})
+                ?SLOG(error, #{
+                    msg => "failed_to_create_authenticator",
+                    authenticator => generate_id(AuthenticatorConfig),
+                    reason => Reason
+                })
         end
     end, CheckedConfig).
 
@@ -650,15 +652,15 @@ handle_call({list_users, ChainName, AuthenticatorID}, _From, State) ->
     reply(Reply, State);
 
 handle_call(Req, _From, State) ->
-    ?SLOG(error, #{msg => "unexpected call", req => Req}),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Req, State) ->
-    ?SLOG(error, #{msg => "unexpected cast", req => Req}),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Req}),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    ?SLOG(error, #{msg => "unexpected info", info => Info}),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 6 - 3
apps/emqx/src/emqx_banned.erl

@@ -187,11 +187,14 @@ init([]) ->
     {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "unexpected msg: ~p", [Msg]),
+    ?SLOG(error, #{
+        msg => "unexpected_msg",
+        payload => Msg
+    }),
     {noreply, State}.
 
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
@@ -199,7 +202,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
     {noreply, ensure_expiry_timer(State), hibernate};
 
 handle_info(Info, State) ->
-    ?LOG(error, "unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #{expiry_timer := TRef}) ->

+ 27 - 9
apps/emqx/src/emqx_broker.erl

@@ -202,7 +202,10 @@ publish(Msg) when is_record(Msg, message) ->
     emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
     case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
         #message{headers = #{allow_publish := false}} ->
-            ?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
+            ?SLOG(notice, #{
+                msg => "stop_publishing",
+                payload => emqx_message:format(Msg)
+            }),
             [];
         Msg1 = #message{topic = Topic} ->
             route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
@@ -215,8 +218,12 @@ safe_publish(Msg) when is_record(Msg, message) ->
         publish(Msg)
     catch
         _:Error:Stk->
-            ?LOG(error, "Publish error: ~0p~n~s~n~0p",
-                 [Error, emqx_message:format(Msg), Stk]),
+            ?SLOG(error,#{
+                msg => "publishing_error",
+                error => Error,
+                payload => Msg,
+                stacktrace => Stk
+            }),
             []
     end.
 
@@ -266,14 +273,22 @@ forward(Node, To, Delivery, async) ->
     case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
         true -> emqx_metrics:inc('messages.forward');
         {badrpc, Reason} ->
-            ?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]),
+            ?SLOG(error, #{
+                msg => "async_forward_msg_to_node_failed",
+                node => Node,
+                reason => Reason
+            }),
             {error, badrpc}
     end;
 
 forward(Node, To, Delivery, sync) ->
     case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
         {badrpc, Reason} ->
-            ?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]),
+            ?SLOG(error, #{
+                msg => "sync_forward_msg_to_node_failed",
+                node => Node,
+                reason => Reason
+            }),
             {error, badrpc};
         Result ->
             emqx_metrics:inc('messages.forward'), Result
@@ -450,14 +465,17 @@ handle_call({subscribe, Topic, I}, _From, State) ->
     {reply, Ok, State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({subscribe, Topic}, State) ->
     case emqx_router:do_add_route(Topic) of
         ok -> ok;
         {error, Reason} ->
-            ?LOG(error, "Failed to add route: ~p", [Reason])
+            ?SLOG(error, #{
+                msg => "failed_to_add_route",
+                reason => Reason
+            })
     end,
     {noreply, State};
 
@@ -481,11 +499,11 @@ handle_cast({unsubscribed, Topic, I}, State) ->
     {noreply, State};
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #{pool := Pool, id := Id}) ->

+ 3 - 3
apps/emqx/src/emqx_broker_helper.erl

@@ -118,7 +118,7 @@ init([]) ->
     {ok, #{pmon => emqx_pmon:new()}}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
@@ -127,7 +127,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
     {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
@@ -138,7 +138,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon})
     {noreply, State#{pmon := PMon1}};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 63 - 26
apps/emqx/src/emqx_channel.erl

@@ -373,11 +373,17 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
             ok = after_message_acked(ClientInfo, Msg, Properties),
             handle_out(publish, Publishes, Channel#channel{session = NSession});
         {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
-            ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
+            ?SLOG(warning, #{
+                msg => "puback_packetId_inuse",
+                packetId => PacketId
+            }),
             ok = emqx_metrics:inc('packets.puback.inuse'),
             {ok, Channel};
         {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?LOG(warning, "The PUBACK PacketId ~w is not found.", [PacketId]),
+            ?SLOG(warning, #{
+                msg => "puback_packetId_not_found",
+                packetId => PacketId
+            }),
             ok = emqx_metrics:inc('packets.puback.missed'),
             {ok, Channel}
     end;
@@ -390,11 +396,11 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
             NChannel = Channel#channel{session = NSession},
             handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
         {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
-            ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
+            ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
             ok = emqx_metrics:inc('packets.pubrec.inuse'),
             handle_out(pubrel, {PacketId, RC}, Channel);
         {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
+            ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
             ok = emqx_metrics:inc('packets.pubrec.missed'),
             handle_out(pubrel, {PacketId, RC}, Channel)
     end;
@@ -405,7 +411,7 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
             NChannel = Channel#channel{session = NSession},
             handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
         {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]),
+            ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
             ok = emqx_metrics:inc('packets.pubrel.missed'),
             handle_out(pubcomp, {PacketId, RC}, Channel)
     end;
@@ -420,7 +426,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
             ok = emqx_metrics:inc('packets.pubcomp.inuse'),
             {ok, Channel};
         {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
+            ?SLOG(warning, #{msg => "pubcomp_packetId_not_found", packetId => PacketId}),
             ok = emqx_metrics:inc('packets.pubcomp.missed'),
             {ok, Channel}
     end;
@@ -501,11 +507,17 @@ handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState})
     handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel);
 
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) ->
-    ?LOG(error, "Unexpected frame error: ~p", [Reason]),
+    ?SLOG(error, #{
+        msg => "malformed_mqtt_message",
+        reason => Reason
+    }),
     {ok, Channel};
 
 handle_in(Packet, Channel) ->
-    ?LOG(error, "Unexpected incoming: ~p", [Packet]),
+    ?SLOG(error, #{
+        msg => "disconnecting_due_to_unexpected_message",
+        packet => Packet
+    }),
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel).
 
 %%--------------------------------------------------------------------
@@ -529,7 +541,10 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo,
         {error, client_id_unavailable} ->
             handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
         {error, Reason} ->
-            ?LOG(error, "Failed to open session due to ~p", [Reason]),
+            ?SLOG(error, #{
+                msg => "failed_to_open_session",
+                reason => Reason
+            }),
             handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
     end.
 
@@ -548,8 +563,11 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
             Msg = packet_to_message(NPacket, NChannel),
             do_publish(PacketId, Msg, NChannel);
         {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
-            ?LOG(warning, "Cannot publish message to ~s due to ~s.",
-                 [Topic, emqx_reason_codes:text(Rc)]),
+            ?SLOG(warning, #{
+                msg => "cannot_publish_to_topic",
+                topic => Topic,
+                reason => emqx_reason_codes:name(Rc)
+            }),
             case emqx:get_config([authorization, deny_action], ignore) of
                 ignore ->
                     case QoS of
@@ -563,8 +581,11 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
                     handle_out(disconnect, Rc, NChannel)
             end;
         {error, Rc = ?RC_QUOTA_EXCEEDED, NChannel} ->
-            ?LOG(warning, "Cannot publish messages to ~s due to ~s.",
-                 [Topic, emqx_reason_codes:text(Rc)]),
+            ?SLOG(warning, #{
+                msg => "cannot_publish_to_topic",
+                topic => Topic,
+                reason => emqx_reason_codes:name(Rc)
+            }),
             case QoS of
                 ?QOS_0 ->
                     ok = emqx_metrics:inc('packets.publish.dropped'),
@@ -575,8 +596,11 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
                     handle_out(pubrec, {PacketId, Rc}, NChannel)
             end;
         {error, Rc, NChannel} ->
-            ?LOG(warning, "Cannot publish message to ~s due to ~s.",
-                 [Topic, emqx_reason_codes:text(Rc)]),
+            ?SLOG(warning, #{
+                msg => "cannot_publish_to_topic",
+                topic => Topic,
+                reason => emqx_reason_codes:name(Rc)
+            }),
             handle_out(disconnect, Rc, NChannel)
     end.
 
@@ -621,8 +645,11 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
             ok = emqx_metrics:inc('packets.publish.inuse'),
             handle_out(pubrec, {PacketId, RC}, Channel);
         {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
-            ?LOG(warning, "Dropped the qos2 packet ~w "
-                 "due to awaiting_rel is full.", [PacketId]),
+            ?SLOG(warning, #{
+                msg => "dropped_qos2_packet",
+                reason => emqx_reason_codes:name(RC),
+                packetId => PacketId
+            }),
             ok = emqx_metrics:inc('packets.publish.dropped'),
             handle_out(pubrec, {PacketId, RC}, Channel)
     end.
@@ -671,8 +698,10 @@ process_subscribe([Topic = {TopicFilter, SubOpts}|More], SubProps, Channel, Acc)
                                                   Channel),
             process_subscribe(More, SubProps, NChannel, [{Topic, ReasonCode} | Acc]);
         {error, ReasonCode} ->
-            ?LOG(warning, "Cannot subscribe ~s due to ~s.",
-                 [TopicFilter, emqx_reason_codes:text(ReasonCode)]),
+            ?SLOG(warning, #{
+                msg => "cannot_subscribe_topic_filter",
+                reason => emqx_reason_codes:name(ReasonCode)
+            }),
             process_subscribe(More, SubProps, Channel, [{Topic, ReasonCode} | Acc])
     end.
 
@@ -685,8 +714,10 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
         {ok, NSession} ->
             {QoS, Channel#channel{session = NSession}};
         {error, RC} ->
-            ?LOG(warning, "Cannot subscribe ~s due to ~s.",
-                 [TopicFilter, emqx_reason_codes:text(RC)]),
+            ?SLOG(warning, #{
+                msg => "cannot_subscribe_topic_filter",
+                reason => emqx_reason_codes:text(RC)
+            }),
             {RC, Channel}
     end.
 
@@ -869,7 +900,7 @@ handle_out(auth, {ReasonCode, Properties}, Channel) ->
     {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
 
 handle_out(Type, Data, Channel) ->
-    ?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
+    ?SLOG(error, #{msg => "unexpected_outgoing", type => Type, data => Data}),
     {ok, Channel}.
 
 %%--------------------------------------------------------------------
@@ -964,7 +995,7 @@ handle_call({quota, Policy}, Channel) ->
     reply(ok, Channel#channel{quota = Quota});
 
 handle_call(Req, Channel) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     reply(ignored, Channel).
 
 %%--------------------------------------------------------------------
@@ -1004,7 +1035,10 @@ handle_info({sock_closed, Reason}, Channel =
     end;
 
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
-    ?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
+    ?SLOG(error, #{
+        msg => "unexpected_sock_closed",
+        reason => Reason
+    }),
     {ok, Channel};
 
 handle_info(clean_authz_cache, Channel) ->
@@ -1012,7 +1046,7 @@ handle_info(clean_authz_cache, Channel) ->
     {ok, Channel};
 
 handle_info(Info, Channel) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {ok, Channel}.
 
 %%--------------------------------------------------------------------
@@ -1075,7 +1109,10 @@ handle_timeout(_TRef, expire_quota_limit, Channel) ->
     {ok, clean_timer(quota_timer, Channel)};
 
 handle_timeout(_TRef, Msg, Channel) ->
-    ?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
+    ?SLOG(error, #{
+        msg => "unexpected_timeout",
+        payload => Msg
+    }),
     {ok, Channel}.
 
 %%--------------------------------------------------------------------

+ 12 - 5
apps/emqx/src/emqx_cm.erl

@@ -276,7 +276,10 @@ takeover_session(ClientId) ->
             takeover_session(ClientId, ChanPid);
         ChanPids ->
             [ChanPid|StalePids] = lists:reverse(ChanPids),
-            ?LOG(error, "More than one channel found: ~p", [ChanPids]),
+            ?SLOG(error, #{
+                msg => "more_than_one_channel_found",
+                chan_pids => ChanPids
+            }),
             lists:foreach(fun(StalePid) ->
                                   catch discard_session(ClientId, StalePid)
                           end, StalePids),
@@ -341,7 +344,10 @@ kick_session(ClientId) ->
             kick_session(ClientId, ChanPid);
         ChanPids ->
             [ChanPid|StalePids] = lists:reverse(ChanPids),
-            ?LOG(error, "More than one channel found: ~p", [ChanPids]),
+            ?SLOG(error, #{
+                msg => "more_than_one_channel_found",
+                chan_pids => ChanPids
+            }),
             lists:foreach(fun(StalePid) ->
                                   catch discard_session(ClientId, StalePid)
                           end, StalePids),
@@ -416,7 +422,7 @@ init([]) ->
     {ok, #{chan_pmon => emqx_pmon:new()}}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
@@ -424,7 +430,7 @@ handle_cast({registered, {ClientId, ChanPid}}, State = #{chan_pmon := PMon}) ->
     {noreply, State#{chan_pmon := PMon1}};
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
@@ -434,7 +440,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
     {noreply, State#{chan_pmon := PMon1}};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
+
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 3 - 3
apps/emqx/src/emqx_cm_registry.erl

@@ -114,11 +114,11 @@ init([]) ->
     {ok, #{}}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({membership, {mnesia, down, Node}}, State) ->
@@ -132,7 +132,7 @@ handle_info({membership, _Event}, State) ->
     {noreply, State};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 6 - 1
apps/emqx/src/emqx_config_handler.erl

@@ -117,7 +117,12 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
                 {error, Result}
         end
     catch Error:Reason:ST ->
-        ?LOG(error, "change_config failed: ~p", [{Error, Reason, ST}]),
+        ?SLOG(error, #{
+            msg => "change_config_failed",
+            error => Error,
+            reason => Reason,
+            st => ST
+        }),
         {error, Reason}
     end,
     {reply, Reply, State};

+ 23 - 9
apps/emqx/src/emqx_connection.erl

@@ -417,14 +417,20 @@ handle_msg({'$gen_cast', Req}, State) ->
     {ok, NewState};
 
 handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
-    ?LOG(debug, "RECV ~0p", [Data]),
+    ?SLOG(debug, #{
+        msg => "RECV_data",
+        data => Data
+    }),
     Oct = iolist_size(Data),
     inc_counter(incoming_bytes, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
     parse_incoming(Data, State);
 
 handle_msg({quic, Data, _Sock, _, _, _}, State) ->
-    ?LOG(debug, "RECV ~0p", [Data]),
+    ?SLOG(debug, #{
+        msg => "RECV_data",
+        data => Data
+    }),
     Oct = iolist_size(Data),
     inc_counter(incoming_bytes, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
@@ -489,7 +495,7 @@ handle_msg({connack, ConnAck}, State) ->
     handle_outgoing(ConnAck, State);
 
 handle_msg({close, Reason}, State) ->
-    ?LOG(debug, "Force to close the socket due to ~p", [Reason]),
+    ?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}),
     handle_info({sock_closed, Reason}, close_socket(State));
 
 handle_msg({event, connected}, State = #state{channel = Channel}) ->
@@ -672,7 +678,10 @@ next_incoming_msgs(Packets) ->
 
 handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
     ok = inc_incoming_stats(Packet),
-    ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
+    ?SLOG(debug, #{
+        msg => "RECV_packet",
+        packet => Packet
+    }),
     with_channel(handle_in, [Packet], State);
 
 handle_incoming(FrameError, State) ->
@@ -708,12 +717,17 @@ handle_outgoing(Packet, State) ->
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet) ->
         try emqx_frame:serialize_pkt(Packet, Serialize) of
-            <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
-                         [emqx_packet:format(Packet)]),
+            <<>> -> ?SLOG(warning, #{
+                        msg => "packet_is_discarded_because_the frame_is_too_large",
+                        packet => emqx_packet:format(Packet)
+                    }),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
                     ok = emqx_metrics:inc('delivery.dropped'),
                     <<>>;
-            Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
+            Data -> ?SLOG(debug, #{
+                        msg => "SEND_packet",
+                        packet => emqx_packet:format(Packet)
+                    }),
                     ok = inc_outgoing_stats(Packet),
                     Data
         catch
@@ -763,7 +777,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
 
 handle_info({sock_error, Reason}, State) ->
     case Reason =/= closed andalso Reason =/= einval of
-        true -> ?LOG(warning, "socket_error: ~p", [Reason]);
+        true -> ?SLOG(warning, #{msg => "socket_error", reason => Reason});
         false -> ok
     end,
     handle_info({sock_closed, Reason}, close_socket(State));
@@ -805,7 +819,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
         {ok, Limiter1} ->
             State#state{limiter = Limiter1};
         {pause, Time, Limiter1} ->
-            ?LOG(warning, "Pause ~pms due to rate limit", [Time]),
+            ?SLOG(warning, #{msg => "pause_time_due_to_rate_limit", time_in_ms => Time}),
             TRef = start_timer(Time, limit_timeout),
             State#state{sockstate   = blocked,
                         limiter     = Limiter1,

+ 4 - 4
apps/emqx/src/emqx_ctl.erl

@@ -185,13 +185,13 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq})
     case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
         [] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
         [[OriginSeq] | _] ->
-            ?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]),
+            ?SLOG(warning, #{msg => "CMD is overidden", cmd => Cmd, mf => MF}),
             true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
     end,
     {reply, ok, next_seq(State)};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({unregister_command, Cmd}, State) ->
@@ -199,11 +199,11 @@ handle_cast({unregister_command, Cmd}, State) ->
     noreply(State);
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     noreply(State).
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     noreply(State).
 
 terminate(_Reason, _State) ->

+ 17 - 7
apps/emqx/src/emqx_flapping.erl

@@ -106,7 +106,7 @@ init([]) ->
     {ok, #{}, hibernate}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({detected, #flapping{clientid   = ClientId,
@@ -116,8 +116,13 @@ handle_cast({detected, #flapping{clientid   = ClientId,
              #{window_time := WindTime, ban_time := Interval}}, State) ->
     case now_diff(StartedAt) < WindTime of
         true -> %% Flapping happened:(
-            ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms",
-                 [ClientId, inet:ntoa(PeerHost), DetectCnt, WindTime]),
+            ?SLOG(error, #{
+                msg => "flapping_detected",
+                client_id => ClientId,
+                peer_host => inet:ntoa(PeerHost),
+                detect_cnt => DetectCnt,
+                wind_time_in_ms => WindTime
+            }),
             Now = erlang:system_time(second),
             Banned = #banned{who    = {clientid, ClientId},
                              by     = <<"flapping detector">>,
@@ -126,13 +131,18 @@ handle_cast({detected, #flapping{clientid   = ClientId,
                              until  = Now + (Interval div 1000)},
             emqx_banned:create(Banned);
         false ->
-            ?LOG(warning, "~s(~s) disconnected ~w times in ~wms",
-                 [ClientId, inet:ntoa(PeerHost), DetectCnt, Interval])
+            ?SLOG(warning, #{
+                msg => "client_disconnected",
+                client_id => ClientId,
+                peer_host => inet:ntoa(PeerHost),
+                detect_cnt => DetectCnt,
+                interval => Interval
+            })
     end,
     {noreply, State};
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
@@ -144,7 +154,7 @@ handle_info({timeout, _TRef, {garbage_collect, Zone}}, State) ->
     {noreply, State, hibernate};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 8 - 4
apps/emqx/src/emqx_hooks.erl

@@ -206,7 +206,11 @@ safe_execute({M, F, A}, Args) ->
         Result -> Result
     catch
         Error:Reason:Stacktrace ->
-            ?LOG(error, "Failed to execute ~0p: ~0p", [{M, F, A}, {Error, Reason, Stacktrace}]),
+            ?SLOG(error, #{
+                msg => "failed_to_execute",
+                module_function_arity => {M, F, A},
+                error_reason_stacktrace => {Error, Reason, Stacktrace}
+            }),
             ok
     end.
 
@@ -246,7 +250,7 @@ handle_call({put, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, S
     {reply, Reply, State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({del, HookPoint, Action}, State) ->
@@ -259,11 +263,11 @@ handle_cast({del, HookPoint, Action}, State) ->
     {noreply, State};
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected msg: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 9 - 5
apps/emqx/src/emqx_metrics.erl

@@ -442,13 +442,17 @@ init([]) ->
     {ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
 
 handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
-    ?LOG(error, "Failed to create ~s:~s for index exceeded.", [Type, Name]),
+    ?SLOG(error, #{
+        msg => "failed_to_create_type_name_for_index_exceeded",
+        type => Type,
+        name => Name
+    }),
     {reply, {error, metric_index_exceeded}, State};
 
 handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) ->
     case ets:lookup(?TAB, Name) of
         [#metric{idx = Idx}] ->
-            ?LOG(info, "~s already exists.", [Name]),
+            ?SLOG(info, #{msg => "name_already_exists", name => Name}),
             {reply, {ok, Idx}, State};
         [] ->
             Metric = #metric{name = Name, type = Type, idx = NextIdx},
@@ -464,15 +468,15 @@ handle_call({set_type_to_counter, Keys}, _From, State) ->
     {reply, ok, State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 8 - 2
apps/emqx/src/emqx_os_mon.erl

@@ -87,7 +87,10 @@ handle_call(Req, _From, State) ->
     {reply, {error, {unexpected_call, Req}}, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "unexpected_cast_discarded: ~p", [Msg]),
+    ?SLOG(error, #{
+        msg => "unexpected_cast_discarded",
+        payload => Msg
+    }),
     {noreply, State}.
 
 handle_info({timeout, _Timer, check}, State) ->
@@ -109,7 +112,10 @@ handle_info({timeout, _Timer, check}, State) ->
     {noreply, State};
 
 handle_info(Info, State) ->
-    ?LOG(info, "unexpected_info_discarded: ~p", [Info]),
+    ?SLOG(info, #{
+        msg => "unexpected_info_discarded",
+        info => Info
+    }),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 68 - 15
apps/emqx/src/emqx_plugins.erl

@@ -50,10 +50,16 @@ load() ->
 load(PluginName) when is_atom(PluginName) ->
     case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
         {false, _} ->
-            ?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]),
+            ?SLOG(alert, #{
+                msg => "plugin_not_found_cannot_load",
+                plugin_name => PluginName
+            }),
             {error, not_found};
         {_, true} ->
-            ?LOG(notice, "Plugin ~s is already started", [PluginName]),
+            ?SLOG(notice, #{
+                msg => "plugin_already_started",
+                plugin_name => PluginName
+            }),
             {error, already_started};
         {_, false} ->
             load_plugin(PluginName)
@@ -69,10 +75,16 @@ unload() ->
 unload(PluginName) when is_atom(PluginName) ->
     case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
         {false, _} ->
-            ?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]),
+            ?SLOG(error, #{
+                msg => "plugin_not_found_cannot_load",
+                plugin_name => PluginName
+            }),
             {error, not_found};
         {_, false} ->
-            ?LOG(error, "Plugin ~s is not started", [PluginName]),
+            ?SLOG(error, #{
+                msg => "plugin_not_started",
+                plugin_name => PluginName
+            }),
             {error, not_started};
         {_, _} ->
             unload_plugin(PluginName)
@@ -81,7 +93,10 @@ unload(PluginName) when is_atom(PluginName) ->
 reload(PluginName) when is_atom(PluginName)->
     case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
         {false, _} ->
-            ?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]),
+            ?SLOG(error, #{
+                msg => "plugin_not_found_cannot_load",
+                plugin_name => PluginName
+            }),
             {error, not_found};
         {_, false} ->
             load(PluginName);
@@ -127,14 +142,20 @@ load_ext_plugins(Dir) ->
         end, filelib:wildcard("*", Dir)).
 
 load_ext_plugin(PluginDir) ->
-    ?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]),
+    ?SLOG(debug, #{
+        msg => "loading_extra_plugin",
+        plugin_dir => PluginDir
+    }),
     Ebin = filename:join([PluginDir, "ebin"]),
     AppFile = filename:join([Ebin, "*.app"]),
     AppName = case filelib:wildcard(AppFile) of
                   [App] ->
                       list_to_atom(filename:basename(App, ".app"));
                   [] ->
-                      ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
+                      ?SLOG(alert, #{
+                          msg => "plugin_app_file_not_found",
+                          app_file => AppFile
+                      }),
                       error({plugin_app_file_not_found, AppFile})
               end,
     ok = load_plugin_app(AppName, Ebin).
@@ -185,7 +206,12 @@ load_plugin(Name) ->
                 {error, Error0}
         end
     catch _ : Error : Stacktrace ->
-        ?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]),
+        ?SLOG(alert, #{
+            msg => "plugin_load_failed",
+            name => Name,
+            error => Error,
+            stk => Stacktrace
+        }),
         {error, parse_config_file_failed}
     end.
 
@@ -202,11 +228,22 @@ load_app(App) ->
 start_app(App) ->
     case application:ensure_all_started(App) of
         {ok, Started} ->
-            ?LOG(info, "Started plugins: ~p", [Started]),
-            ?LOG(info, "Load plugin ~s successfully", [App]),
+            ?SLOG(info, #{
+                msg => "all_started_plugins",
+                started => Started
+            }),
+            ?SLOG(info, #{
+                msg => "load_plugin_app_successfully",
+                app => App
+            }),
             ok;
         {error, {ErrApp, Reason}} ->
-            ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]),
+            ?SLOG(error, #{
+                msg => "load_plugin_failed_cannot_started",
+                app => App,
+                err_app => ErrApp,
+                reason => Reason
+            }),
             {error, {ErrApp, Reason}}
     end.
 
@@ -221,11 +258,24 @@ unload_plugin(App) ->
 stop_app(App) ->
     case application:stop(App) of
         ok ->
-            ?LOG(info, "Stop plugin ~s successfully", [App]), ok;
+            ?SLOG(info, #{
+                msg => "stop_plugin_successfully",
+                app => App
+            }),
+            ok;
         {error, {not_started, App}} ->
-            ?LOG(error, "Plugin ~s is not started", [App]), ok;
+            ?SLOG(error, #{
+                msg => "plugin_not_started",
+                app => App
+            }),
+            ok;
         {error, Reason} ->
-            ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason}
+            ?SLOG(error, #{
+                msg => "stop_plugin",
+                app => App,
+                error => Reason
+            }),
+            {error, Reason}
     end.
 
 names(plugin) ->
@@ -238,4 +288,7 @@ names(Plugins) ->
     [Name || #plugin{name = Name} <- Plugins].
 
 funlog(Key, Value) ->
-    ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).
+    ?SLOG(info, #{
+        key => string:join(Key, "."),
+        value => Value
+    }).

+ 8 - 4
apps/emqx/src/emqx_pool.erl

@@ -100,22 +100,26 @@ handle_call({submit, Task}, _From, State) ->
     {reply, catch run(Task), State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({async_submit, Task}, State) ->
     try run(Task)
     catch _:Error:Stacktrace ->
-        ?LOG(error, "Error: ~0p, ~0p", [Error, Stacktrace])
+        ?SLOG(error, #{
+            msg => "error",
+            error => Error,
+            stk => Stacktrace
+        })
     end,
     {noreply, State};
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #{pool := Pool, id := Id}) ->

+ 3 - 3
apps/emqx/src/emqx_router.erl

@@ -203,15 +203,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) ->
     {reply, Ok, State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #{pool := Pool, id := Id}) ->

+ 7 - 4
apps/emqx/src/emqx_router_helper.erl

@@ -109,11 +109,11 @@ init([]) ->
     {ok, #{nodes => Nodes}, hibernate}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}},
@@ -130,7 +130,10 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
     {noreply, State};
 
 handle_info({mnesia_table_event, Event}, State) ->
-    ?LOG(error, "Unexpected mnesia_table_event: ~p", [Event]),
+    ?SLOG(error,#{
+        msg => "unexpected_mnesia_table_event",
+        event => Event
+    }),
     {noreply, State};
 
 handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
@@ -148,7 +151,7 @@ handle_info({membership, _Event}, State) ->
     {noreply, State};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 8 - 3
apps/emqx/src/emqx_session.erl

@@ -479,11 +479,16 @@ log_dropped(Msg = #message{qos = QoS}, #session{mqueue = Q}) ->
     case (QoS == ?QOS_0) andalso (not emqx_mqueue:info(store_qos0, Q)) of
         true  ->
             ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
-            ?LOG(warning, "Dropped qos0 msg: ~s", [emqx_message:format(Msg)]);
+            ?SLOG(warning, #{
+                msg => "dropped_qos0_msg",
+                payload => emqx_message:format(Msg)
+            });
         false ->
             ok = emqx_metrics:inc('delivery.dropped.queue_full'),
-            ?LOG(warning, "Dropped msg due to mqueue is full: ~s",
-                 [emqx_message:format(Msg)])
+            ?SLOG(warning, #{
+                    msg => "dropped_msg_due_to_mqueue_is_full",
+                    payload => emqx_message:format(Msg)
+            })
     end.
 
 enrich_fun(Session = #session{subscriptions = Subs}) ->

+ 6 - 3
apps/emqx/src/emqx_shared_sub.erl

@@ -325,11 +325,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
     {reply, ok, State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
@@ -348,7 +348,10 @@ handle_info({mnesia_table_event, _Event}, State) ->
     {noreply, State};
 
 handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
-    ?LOG(info, "Shared subscriber down: ~p", [SubPid]),
+    ?SLOG(info, #{
+        msg => "shared_subscriber_down",
+        sub_pid => SubPid
+    }),
     cleanup_down(SubPid),
     {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
 

+ 17 - 6
apps/emqx/src/emqx_stats.erl

@@ -202,7 +202,7 @@ handle_call(stop, _From, State) ->
     {stop, normal, ok, State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast({setstat, Stat, MaxStat, Val}, State) ->
@@ -221,7 +221,10 @@ handle_cast({update_interval, Update = #update{name = Name}},
             State = #state{updates = Updates}) ->
     NState = case lists:keyfind(Name, #update.name, Updates) of
                  #update{} ->
-                     ?LOG(warning, "Duplicated update: ~s", [Name]),
+                     ?SLOG(warning, #{
+                         msg => "duplicated_update",
+                         name => Name
+                        }),
                      State;
                  false -> State#state{updates = [Update|Updates]}
              end,
@@ -232,7 +235,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
     {noreply, State#state{updates = Updates1}};
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
@@ -242,7 +245,11 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
                          try UpFun()
                          catch
                              _:Error ->
-                                 ?LOG(error, "Update ~s failed: ~0p", [Name, Error])
+                                 ?SLOG(error, #{
+                                     msg => "update_name_failed",
+                                     name => Name,
+                                     error => Error
+                                    })
                          end,
                          [Update#update{countdown = I} | Acc];
                     (Update = #update{countdown = C}, Acc) ->
@@ -251,7 +258,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
     {noreply, start_timer(State#state{updates = Updates1}), hibernate};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #state{timer = TRef}) ->
@@ -271,6 +278,10 @@ safe_update_element(Key, Val) ->
         true -> true
     catch
         error:badarg ->
-            ?LOG(warning, "Failed to update ~0p to ~0p", [Key, Val])
+            ?SLOG(warning, #{
+                msg => "failed_to_update",
+                key => Key,
+                val => Val
+            })
     end.
 

+ 3 - 3
apps/emqx/src/emqx_sys.erl

@@ -134,11 +134,11 @@ handle_call(uptime, _From, State) ->
     {reply, uptime(State), State};
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
@@ -156,7 +156,7 @@ handle_info({timeout, TRef, tick},
     {noreply, tick(State), hibernate};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->

+ 29 - 9
apps/emqx/src/emqx_sys_mon.erl

@@ -83,18 +83,21 @@ sysm_opts([_Opt|Opts], Acc) ->
     sysm_opts(Opts, Acc).
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
     {noreply, State}.
 
 handle_info({monitor, Pid, long_gc, Info}, State) ->
     suppress({long_gc, Pid},
              fun() ->
                  WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
-                 ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]),
+                 ?SLOG(warning, #{
+                     warn_msg => WarnMsg,
+                     pid_info => procinfo(Pid)
+                    }),
                  safe_publish(long_gc, WarnMsg)
              end, State);
 
@@ -102,7 +105,10 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
     suppress({long_schedule, Pid},
              fun() ->
                  WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
-                 ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]),
+                 ?SLOG(warning, #{
+                     warn_msg => WarnMsg,
+                     pid_info => procinfo(Pid)
+                    }),
                  safe_publish(long_schedule, WarnMsg)
              end, State);
 
@@ -110,7 +116,10 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
     suppress({long_schedule, Port},
              fun() ->
                  WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
-                 ?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]),
+                 ?SLOG(warning, #{
+                     warn_msg => WarnMsg,
+                     port_info => erlang:port_info(Port)
+                    }),
                  safe_publish(long_schedule, WarnMsg)
              end, State);
 
@@ -118,7 +127,10 @@ handle_info({monitor, Pid, large_heap, Info}, State) ->
     suppress({large_heap, Pid},
              fun() ->
                  WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
-                 ?LOG(warning, "~s~n~p", [WarnMsg, procinfo(Pid)]),
+                 ?SLOG(warning, #{
+                     warn_msg => WarnMsg,
+                     pid_info => procinfo(Pid)
+                    }),
                  safe_publish(large_heap, WarnMsg)
              end, State);
 
@@ -126,7 +138,11 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
     suppress({busy_port, Port},
              fun() ->
                  WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
-                 ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
+                 ?SLOG(warning, #{
+                     warn_msg => WarnMsg,
+                     pid_info => procinfo(SusPid),
+                     port_info => erlang:port_info(Port)
+                    }),
                  safe_publish(busy_port, WarnMsg)
              end, State);
 
@@ -134,7 +150,11 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
     suppress({busy_dist_port, Port},
              fun() ->
                  WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
-                 ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
+                 ?SLOG(warning, #{
+                     warn_msg => WarnMsg,
+                     pid_info => procinfo(SusPid),
+                     port_info => erlang:port_info(Port)
+                    }),
                  safe_publish(busy_dist_port, WarnMsg)
              end, State);
 
@@ -142,7 +162,7 @@ handle_info({timeout, _Ref, reset}, State) ->
     {noreply, State#{events := []}, hibernate};
 
 handle_info(Info, State) ->
-    ?LOG(error, "Unexpected Info: ~p", [Info]),
+    ?SLOG(error, #{msg => "unexpected_info", info => Info}),
     {noreply, State}.
 
 terminate(_Reason, #{timer := TRef}) ->

+ 11 - 4
apps/emqx/src/emqx_tracer.erl

@@ -115,18 +115,25 @@ install_trace_handler(Who, Level, LogFile) ->
                                           {fun filter_by_meta_key/2, Who}}]})
     of
         ok ->
-            ?LOG(info, "Start trace for ~p", [Who]);
+            ?SLOG(info, #{msg => "start_trace_for", who => Who});
         {error, Reason} ->
-            ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
+            ?SLOG(error, #{msg => "start_trace_for_who_failed", who => Who, reason => Reason}),
             {error, Reason}
     end.
 
 uninstall_trance_handler(Who) ->
     case logger:remove_handler(handler_id(Who)) of
         ok ->
-            ?LOG(info, "Stop trace for ~p", [Who]);
+            ?SLOG(info, #{
+                msg => "stop_trace_for",
+                who => Who
+            });
         {error, Reason} ->
-            ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
+            ?SLOG(error, #{
+                msg => "stop_trace_for",
+                who => Who,
+                reason => Reason
+            }),
             {error, Reason}
     end.
 

+ 12 - 3
apps/emqx/src/emqx_vm_mon.erl

@@ -49,11 +49,17 @@ init([]) ->
     {ok, #{}}.
 
 handle_call(Req, _From, State) ->
-    ?LOG(error, "[VM_MON] Unexpected call: ~p", [Req]),
+    ?SLOG(error, #{
+        msg => "[VM_MON]_unexpected_call",
+        req => Req
+    }),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    ?LOG(error, "[VM_MON] Unexpected cast: ~p", [Msg]),
+    ?SLOG(error, #{
+        msg => "[VM_MON]_unexpected_cast",
+        cast => Msg
+    }),
     {noreply, State}.
 
 handle_info({timeout, _Timer, check}, State) ->
@@ -75,7 +81,10 @@ handle_info({timeout, _Timer, check}, State) ->
     {noreply, State};
 
 handle_info(Info, State) ->
-    ?LOG(error, "[VM_MON] Unexpected info: ~p", [Info]),
+    ?SLOG(error, #{
+        msg => "[VM_MON]_unexpected_info",
+        info => Info
+    }),
     {noreply, State}.
 
 terminate(_Reason, _State) ->

+ 20 - 14
apps/emqx/src/emqx_ws_connection.erl

@@ -182,7 +182,10 @@ init(Req, #{listener := {Type, Listener}} = Opts) ->
               },
     case check_origin_header(Req, Opts) of
         {error, Message} ->
-            ?LOG(error, "Invalid Origin Header ~p~n", [Message]),
+            ?SLOG(error, #{
+                msg => "invalid_origin_header",
+                payload => Message
+            }),
             {ok, cowboy_req:reply(403, Req), WsOpts};
         ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts)
     end.
@@ -263,11 +266,12 @@ websocket_init([Req, #{zone := Zone, listener := {Type, Listener}} = Opts]) ->
     WsCookie = try cowboy_req:parse_cookies(Req)
                catch
                    error:badarg ->
-                       ?LOG(error, "Illegal cookie"),
+                       ?SLOG(error, #{msg => "illegal_cookie"}),
                        undefined;
                    Error:Reason ->
-                       ?LOG(error, "Failed to parse cookie, Error: ~0p, Reason ~0p",
-                            [Error, Reason]),
+                       ?SLOG(error, #{msg => "failed_to_parse_cookie",
+                           error => Error,
+                           reason => Reason}),
                        undefined
                end,
     ConnInfo = #{socktype  => ws,
@@ -324,7 +328,7 @@ websocket_handle({binary, Data}, State) when is_list(Data) ->
     websocket_handle({binary, iolist_to_binary(Data)}, State);
 
 websocket_handle({binary, Data}, State) ->
-    ?LOG(debug, "RECV ~0p", [Data]),
+    ?SLOG(debug, #{msg => "recv_data", data => Data}),
     ok = inc_recv_stats(1, iolist_size(Data)),
     NState = ensure_stats_timer(State),
     return(parse_incoming(Data, NState));
@@ -339,7 +343,7 @@ websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong ->
 
 websocket_handle({Frame, _}, State) ->
     %% TODO: should not close the ws connection
-    ?LOG(error, "Unexpected frame - ~p", [Frame]),
+    ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
     shutdown(unexpected_ws_frame, State).
 
 websocket_info({call, From, Req}, State) ->
@@ -397,11 +401,11 @@ websocket_info(Info, State) ->
 websocket_close({_, ReasonCode, _Payload}, State) when is_integer(ReasonCode) ->
     websocket_close(ReasonCode, State);
 websocket_close(Reason, State) ->
-    ?LOG(debug, "Websocket closed due to ~p~n", [Reason]),
+    ?SLOG(debug, #{msg => "websocket_closed", reason => Reason}),
     handle_info({sock_closed, Reason}, State).
 
 terminate(Reason, _Req, #state{channel = Channel}) ->
-    ?LOG(debug, "Terminated due to ~p", [Reason]),
+    ?SLOG(debug, #{msg => "terminated", reason => Reason}),
     emqx_channel:terminate(Reason, Channel);
 
 terminate(_Reason, _Req, _UnExpectedState) ->
@@ -446,7 +450,7 @@ handle_info({connack, ConnAck}, State) ->
     return(enqueue(ConnAck, State));
 
 handle_info({close, Reason}, State) ->
-    ?LOG(debug, "Force to close the socket due to ~p", [Reason]),
+    ?SLOG(debug, #{msg => "force_to_close_the_socket", reason => Reason}),
     return(enqueue({close, Reason}, State));
 
 handle_info({event, connected}, State = #state{channel = Channel}) ->
@@ -499,7 +503,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
         {ok, Limiter1} ->
             State#state{limiter = Limiter1};
         {pause, Time, Limiter1} ->
-            ?LOG(warning, "Pause ~pms due to rate limit", [Time]),
+            ?SLOG(warning, #{msg => "pause_due_to_rate_limit", time => Time}),
             TRef = start_timer(Time, limit_timeout),
             NState = State#state{sockstate   = blocked,
                                  limiter     = Limiter1,
@@ -570,7 +574,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
 
 handle_incoming(Packet, State = #state{listener = {Type, Listener}})
   when is_record(Packet, mqtt_packet) ->
-    ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
+    ?SLOG(debug, #{msg => "RECV", packet => emqx_packet:format(Packet)}),
     ok = inc_incoming_stats(Packet),
     NState = case emqx_pd:get_counter(incoming_pubs) >
                   get_active_n(Type, Listener) of
@@ -628,12 +632,14 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
     fun(Packet) ->
         try emqx_frame:serialize_pkt(Packet, Serialize) of
-            <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
-                         [emqx_packet:format(Packet)]),
+            <<>> -> ?SLOG(warning, #{
+                        msg => "packet_is_discarded_due_to_the_frame_is_too_large",
+                        packet => emqx_packet:format(Packet)
+                    }),
                     ok = emqx_metrics:inc('delivery.dropped.too_large'),
                     ok = emqx_metrics:inc('delivery.dropped'),
                     <<>>;
-            Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
+            Data -> ?SLOG(debug, #{msg => "SEND", packet => Packet}),
                     ok = inc_outgoing_stats(Packet),
                     Data
         catch