|
@@ -128,6 +128,8 @@
|
|
|
-dialyzer({no_match, [info/2]}).
|
|
-dialyzer({no_match, [info/2]}).
|
|
|
-dialyzer({nowarn_function, [websocket_init/1]}).
|
|
-dialyzer({nowarn_function, [websocket_init/1]}).
|
|
|
|
|
|
|
|
|
|
+-define(LOG(Level, Data), ?SLOG(Level, (Data)#{tag => "MQTT"})).
|
|
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
%% Info, Stats
|
|
%% Info, Stats
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
@@ -401,7 +403,7 @@ get_peer_info(Type, Listener, Req, Opts) ->
|
|
|
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
|
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
|
|
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
|
websocket_handle({binary, iolist_to_binary(Data)}, State);
|
|
|
websocket_handle({binary, Data}, State) ->
|
|
websocket_handle({binary, Data}, State) ->
|
|
|
- ?SLOG(debug, #{
|
|
|
|
|
|
|
+ ?LOG(debug, #{
|
|
|
msg => "raw_bin_received",
|
|
msg => "raw_bin_received",
|
|
|
size => iolist_size(Data),
|
|
size => iolist_size(Data),
|
|
|
bin => binary_to_list(binary:encode_hex(Data)),
|
|
bin => binary_to_list(binary:encode_hex(Data)),
|
|
@@ -428,7 +430,7 @@ websocket_handle({Frame, _}, State) when Frame =:= ping; Frame =:= pong ->
|
|
|
return(State);
|
|
return(State);
|
|
|
websocket_handle({Frame, _}, State) ->
|
|
websocket_handle({Frame, _}, State) ->
|
|
|
%% TODO: should not close the ws connection
|
|
%% TODO: should not close the ws connection
|
|
|
- ?SLOG(error, #{msg => "unexpected_frame", frame => Frame}),
|
|
|
|
|
|
|
+ ?LOG(error, #{msg => "unexpected_frame", frame => Frame}),
|
|
|
shutdown(unexpected_ws_frame, State).
|
|
shutdown(unexpected_ws_frame, State).
|
|
|
websocket_info({call, From, Req}, State) ->
|
|
websocket_info({call, From, Req}, State) ->
|
|
|
handle_call(From, Req, State);
|
|
handle_call(From, Req, State);
|
|
@@ -714,7 +716,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|
|
parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
|
|
parse_incoming(Rest, [{incoming, Packet} | Packets], NState)
|
|
|
catch
|
|
catch
|
|
|
throw:{?FRAME_PARSE_ERROR, Reason} ->
|
|
throw:{?FRAME_PARSE_ERROR, Reason} ->
|
|
|
- ?SLOG(info, #{
|
|
|
|
|
|
|
+ ?LOG(info, #{
|
|
|
reason => Reason,
|
|
reason => Reason,
|
|
|
at_state => emqx_frame:describe_state(ParseState),
|
|
at_state => emqx_frame:describe_state(ParseState),
|
|
|
input_bytes => Data
|
|
input_bytes => Data
|
|
@@ -722,7 +724,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
|
|
|
FrameError = {frame_error, Reason},
|
|
FrameError = {frame_error, Reason},
|
|
|
{[{incoming, FrameError} | Packets], State};
|
|
{[{incoming, FrameError} | Packets], State};
|
|
|
error:Reason:Stacktrace ->
|
|
error:Reason:Stacktrace ->
|
|
|
- ?SLOG(error, #{
|
|
|
|
|
|
|
+ ?LOG(error, #{
|
|
|
at_state => emqx_frame:describe_state(ParseState),
|
|
at_state => emqx_frame:describe_state(ParseState),
|
|
|
input_bytes => Data,
|
|
input_bytes => Data,
|
|
|
exception => Reason,
|
|
exception => Reason,
|
|
@@ -812,7 +814,7 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|
|
fun(Packet) ->
|
|
fun(Packet) ->
|
|
|
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
|
try emqx_frame:serialize_pkt(Packet, Serialize) of
|
|
|
<<>> ->
|
|
<<>> ->
|
|
|
- ?SLOG(warning, #{
|
|
|
|
|
|
|
+ ?LOG(warning, #{
|
|
|
msg => "packet_discarded",
|
|
msg => "packet_discarded",
|
|
|
reason => "frame_too_large",
|
|
reason => "frame_too_large",
|
|
|
packet => emqx_packet:format(Packet)
|
|
packet => emqx_packet:format(Packet)
|
|
@@ -828,13 +830,13 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
|
|
|
catch
|
|
catch
|
|
|
%% Maybe Never happen.
|
|
%% Maybe Never happen.
|
|
|
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
|
|
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
|
|
|
- ?SLOG(info, #{
|
|
|
|
|
|
|
+ ?LOG(info, #{
|
|
|
reason => Reason,
|
|
reason => Reason,
|
|
|
input_packet => Packet
|
|
input_packet => Packet
|
|
|
}),
|
|
}),
|
|
|
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
|
|
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
|
|
|
error:Reason:Stacktrace ->
|
|
error:Reason:Stacktrace ->
|
|
|
- ?SLOG(error, #{
|
|
|
|
|
|
|
+ ?LOG(error, #{
|
|
|
input_packet => Packet,
|
|
input_packet => Packet,
|
|
|
exception => Reason,
|
|
exception => Reason,
|
|
|
stacktrace => Stacktrace
|
|
stacktrace => Stacktrace
|