|
|
@@ -18,50 +18,58 @@
|
|
|
-module(emqx_stomp_protocol).
|
|
|
|
|
|
-include("emqx_stomp.hrl").
|
|
|
+
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
+-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
|
|
+-logger_header("[Stomp-Proto]").
|
|
|
+
|
|
|
-import(proplists, [get_value/2, get_value/3]).
|
|
|
|
|
|
%% API
|
|
|
--export([ init/3
|
|
|
+-export([ init/2
|
|
|
, info/1
|
|
|
]).
|
|
|
|
|
|
-export([ received/2
|
|
|
, send/2
|
|
|
, shutdown/2
|
|
|
+ , timeout/3
|
|
|
]).
|
|
|
|
|
|
--record(stomp_proto, {peername,
|
|
|
- sendfun,
|
|
|
- connected = false,
|
|
|
- proto_ver,
|
|
|
- proto_name,
|
|
|
- heart_beats,
|
|
|
- login,
|
|
|
- allow_anonymous,
|
|
|
- default_user,
|
|
|
- subscriptions = []}).
|
|
|
+-record(stomp_proto, {
|
|
|
+ peername,
|
|
|
+ heartfun,
|
|
|
+ sendfun,
|
|
|
+ connected = false,
|
|
|
+ proto_ver,
|
|
|
+ proto_name,
|
|
|
+ heart_beats,
|
|
|
+ login,
|
|
|
+ allow_anonymous,
|
|
|
+ default_user,
|
|
|
+ subscriptions = [],
|
|
|
+ timers :: #{atom() => disable | undefined | reference()}
|
|
|
+ }).
|
|
|
+
|
|
|
+-define(TIMER_TABLE, #{
|
|
|
+ incoming_timer => incoming,
|
|
|
+ outgoing_timer => outgoing
|
|
|
+ }).
|
|
|
|
|
|
-type(stomp_proto() :: #stomp_proto{}).
|
|
|
|
|
|
--define(LOG(Level, Format, Args, State),
|
|
|
- emqx_logger:Level("Stomp(~s): " ++ Format, [esockd:format(State#stomp_proto.peername) | Args])).
|
|
|
-
|
|
|
--define(record_to_proplist(Def, Rec),
|
|
|
- lists:zip(record_info(fields, Def), tl(tuple_to_list(Rec)))).
|
|
|
-
|
|
|
--define(record_to_proplist(Def, Rec, Fields),
|
|
|
- [{K, V} || {K, V} <- ?record_to_proplist(Def, Rec),
|
|
|
- lists:member(K, Fields)]).
|
|
|
-
|
|
|
%% @doc Init protocol
|
|
|
-init(Peername, SendFun, Env) ->
|
|
|
+init(#{peername := Peername,
|
|
|
+ sendfun := SendFun,
|
|
|
+ heartfun := HeartFun}, Env) ->
|
|
|
AllowAnonymous = get_value(allow_anonymous, Env, false),
|
|
|
DefaultUser = get_value(default_user, Env),
|
|
|
#stomp_proto{peername = Peername,
|
|
|
+ heartfun = HeartFun,
|
|
|
sendfun = SendFun,
|
|
|
+ timers = #{},
|
|
|
allow_anonymous = AllowAnonymous,
|
|
|
default_user = DefaultUser}.
|
|
|
|
|
|
@@ -78,9 +86,10 @@ info(#stomp_proto{connected = Connected,
|
|
|
{login, Login},
|
|
|
{subscriptions, Subscriptions}].
|
|
|
|
|
|
--spec(received(stomp_frame(), stomp_proto()) -> {ok, stomp_proto()}
|
|
|
- | {error, any(), stomp_proto()}
|
|
|
- | {stop, any(), stomp_proto()}).
|
|
|
+-spec(received(stomp_frame(), stomp_proto())
|
|
|
+ -> {ok, stomp_proto()}
|
|
|
+ | {error, any(), stomp_proto()}
|
|
|
+ | {stop, any(), stomp_proto()}).
|
|
|
received(Frame = #stomp_frame{command = <<"STOMP">>}, State) ->
|
|
|
received(Frame#stomp_frame{command = <<"CONNECT">>}, State);
|
|
|
|
|
|
@@ -92,12 +101,11 @@ received(#stomp_frame{command = <<"CONNECT">>, headers = Headers},
|
|
|
Passc = header(<<"passcode">>, Headers),
|
|
|
case check_login(Login, Passc, AllowAnonymous, DefaultUser) of
|
|
|
true ->
|
|
|
- Heartbeats = header(<<"heart-beat">>, Headers, <<"0,0">>),
|
|
|
- self() ! {heartbeat, start, parse_heartbeats(Heartbeats)},
|
|
|
- NewState = State#stomp_proto{connected = true, proto_ver = Version,
|
|
|
- heart_beats = Heartbeats, login = Login},
|
|
|
+ Heartbeats = parse_heartbeats(header(<<"heart-beat">>, Headers, <<"0,0">>)),
|
|
|
+ NState = start_heartbeart_timer(Heartbeats, State#stomp_proto{connected = true,
|
|
|
+ proto_ver = Version, login = Login}),
|
|
|
send(connected_frame([{<<"version">>, Version},
|
|
|
- {<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NewState);
|
|
|
+ {<<"heart-beat">>, reverse_heartbeats(Heartbeats)}]), NState);
|
|
|
false ->
|
|
|
send(error_frame(undefined, <<"Login or passcode error!">>), State),
|
|
|
{error, login_or_passcode_error, State}
|
|
|
@@ -206,8 +214,8 @@ received(#stomp_frame{command = <<"BEGIN">>, headers = Headers}, State) ->
|
|
|
received(#stomp_frame{command = <<"COMMIT">>, headers = Headers}, State) ->
|
|
|
Id = header(<<"transaction">>, Headers),
|
|
|
case emqx_stomp_transaction:commit(Id, State) of
|
|
|
- {ok, NewState} ->
|
|
|
- maybe_send_receipt(receipt_id(Headers), NewState);
|
|
|
+ {ok, NState} ->
|
|
|
+ maybe_send_receipt(receipt_id(Headers), NState);
|
|
|
{error, not_found} ->
|
|
|
send(error_frame(receipt_id(Headers), ["Transaction ", Id, " not found"]), State)
|
|
|
end;
|
|
|
@@ -248,17 +256,40 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
|
|
body = Payload},
|
|
|
send(Frame, State);
|
|
|
false ->
|
|
|
- ?LOG(error, "Stomp dropped: ~p", [Msg], State),
|
|
|
+ ?LOG(error, "Stomp dropped: ~p", [Msg]),
|
|
|
{error, dropped, State}
|
|
|
end;
|
|
|
|
|
|
-send(Frame, State = #stomp_proto{sendfun = SendFun}) ->
|
|
|
- ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)], State),
|
|
|
+send(Frame, State = #stomp_proto{sendfun = {Fun, Args}}) ->
|
|
|
+ ?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
|
|
Data = emqx_stomp_frame:serialize(Frame),
|
|
|
- ?LOG(debug, "SEND ~p", [Data], State),
|
|
|
- SendFun(Data),
|
|
|
+ ?LOG(debug, "SEND ~p", [Data]),
|
|
|
+ erlang:apply(Fun, [Data] ++ Args),
|
|
|
{ok, State}.
|
|
|
|
|
|
+shutdown(_Reason, _State) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
+timeout(_TRef, {incoming, NewVal},
|
|
|
+ State = #stomp_proto{heart_beats = HrtBt}) ->
|
|
|
+ case emqx_stomp_heartbeat:check(incoming, NewVal, HrtBt) of
|
|
|
+ {error, timeout} ->
|
|
|
+ {shutdown, heartbeat_timeout, State};
|
|
|
+ {ok, NHrtBt} ->
|
|
|
+ {ok, reset_timer(incoming_timer, State#stomp_proto{heart_beats = NHrtBt})}
|
|
|
+ end;
|
|
|
+
|
|
|
+timeout(_TRef, {outgoing, NewVal},
|
|
|
+ State = #stomp_proto{heart_beats = HrtBt,
|
|
|
+ heartfun = {Fun, Args}}) ->
|
|
|
+ case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
|
|
|
+ {error, timeout} ->
|
|
|
+ _ = erlang:apply(Fun, Args),
|
|
|
+ {ok, State};
|
|
|
+ {ok, NHrtBt} ->
|
|
|
+ {ok, reset_timer(outgoing_timer, State#stomp_proto{heart_beats = NHrtBt})}
|
|
|
+ end.
|
|
|
+
|
|
|
negotiate_version(undefined) ->
|
|
|
{ok, <<"1.0">>};
|
|
|
negotiate_version(Accepts) ->
|
|
|
@@ -322,17 +353,6 @@ error_frame(Headers, undefined, Msg) ->
|
|
|
error_frame(Headers, ReceiptId, Msg) ->
|
|
|
emqx_stomp_frame:make(<<"ERROR">>, [{<<"receipt-id">>, ReceiptId} | Headers], Msg).
|
|
|
|
|
|
-parse_heartbeats(Heartbeats) ->
|
|
|
- CxCy = re:split(Heartbeats, <<",">>, [{return, list}]),
|
|
|
- list_to_tuple([list_to_integer(S) || S <- CxCy]).
|
|
|
-
|
|
|
-reverse_heartbeats(Heartbeats) ->
|
|
|
- CxCy = re:split(Heartbeats, <<",">>, [{return, list}]),
|
|
|
- list_to_binary(string:join(lists:reverse(CxCy), ",")).
|
|
|
-
|
|
|
-shutdown(_Reason, _State) ->
|
|
|
- ok.
|
|
|
-
|
|
|
next_msgid() ->
|
|
|
MsgId = case get(msgid) of
|
|
|
undefined -> 1;
|
|
|
@@ -363,3 +383,52 @@ make_mqtt_message(Topic, Headers, Body) ->
|
|
|
receipt_id(Headers) ->
|
|
|
header(<<"receipt">>, Headers).
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Heartbeat
|
|
|
+
|
|
|
+parse_heartbeats(Heartbeats) ->
|
|
|
+ CxCy = re:split(Heartbeats, <<",">>, [{return, list}]),
|
|
|
+ list_to_tuple([list_to_integer(S) || S <- CxCy]).
|
|
|
+
|
|
|
+reverse_heartbeats({Cx, Cy}) ->
|
|
|
+ iolist_to_binary(io_lib:format("~w,~w", [Cy, Cx])).
|
|
|
+
|
|
|
+start_heartbeart_timer(Heartbeats, State) ->
|
|
|
+ ensure_timer(
|
|
|
+ [incoming_timer, outgoing_timer],
|
|
|
+ State#stomp_proto{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}).
|
|
|
+
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Timer
|
|
|
+
|
|
|
+ensure_timer([Name], State) ->
|
|
|
+ ensure_timer(Name, State);
|
|
|
+ensure_timer([Name | Rest], State) ->
|
|
|
+ ensure_timer(Rest, ensure_timer(Name, State));
|
|
|
+
|
|
|
+ensure_timer(Name, State = #stomp_proto{timers = Timers}) ->
|
|
|
+ TRef = maps:get(Name, Timers, undefined),
|
|
|
+ Time = interval(Name, State),
|
|
|
+ case TRef == undefined andalso is_integer(Time) andalso Time > 0 of
|
|
|
+ true -> ensure_timer(Name, Time, State);
|
|
|
+ false -> State %% Timer disabled or exists
|
|
|
+ end.
|
|
|
+
|
|
|
+ensure_timer(Name, Time, State = #stomp_proto{timers = Timers}) ->
|
|
|
+ Msg = maps:get(Name, ?TIMER_TABLE),
|
|
|
+ TRef = emqx_misc:start_timer(Time, Msg),
|
|
|
+ State#stomp_proto{timers = Timers#{Name => TRef}}.
|
|
|
+
|
|
|
+reset_timer(Name, State) ->
|
|
|
+ ensure_timer(Name, clean_timer(Name, State)).
|
|
|
+
|
|
|
+reset_timer(Name, Time, State) ->
|
|
|
+ ensure_timer(Name, Time, clean_timer(Name, State)).
|
|
|
+
|
|
|
+clean_timer(Name, State = #stomp_proto{timers = Timers}) ->
|
|
|
+ State#stomp_proto{timers = maps:remove(Name, Timers)}.
|
|
|
+
|
|
|
+interval(incoming_timer, #stomp_proto{heart_beats = HrtBt}) ->
|
|
|
+ emqx_stomp_heartbeat:interval(incoming, HrtBt);
|
|
|
+interval(outgoing_timer, #stomp_proto{heart_beats = HrtBt}) ->
|
|
|
+ emqx_stomp_heartbeat:interval(outgoing, HrtBt).
|