|
|
@@ -20,6 +20,7 @@
|
|
|
-include("emqx_stomp.hrl").
|
|
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
+-include_lib("emqx/include/types.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
|
|
@@ -55,7 +56,7 @@
|
|
|
%% Stomp ClientInfo
|
|
|
clientinfo :: emqx_types:clientinfo(),
|
|
|
%% Stomp Heartbeats
|
|
|
- heart_beats :: emqx_stomp_hearbeat:heartbeat(),
|
|
|
+ heart_beats :: maybe(emqx_stomp_hearbeat:heartbeat()),
|
|
|
%% Stomp Connection State
|
|
|
connected = false,
|
|
|
%% Timers
|
|
|
@@ -65,13 +66,13 @@
|
|
|
%% Subscriptions
|
|
|
subscriptions = #{},
|
|
|
%% Send function
|
|
|
- sendfun :: function(),
|
|
|
+ sendfun :: {function(), list()},
|
|
|
%% Heartbeat function
|
|
|
- heartfun :: function(),
|
|
|
+ heartfun :: {function(), list()},
|
|
|
%% The confs for the connection
|
|
|
%% TODO: put these configs into a public mem?
|
|
|
- allow_anonymous,
|
|
|
- default_user
|
|
|
+ allow_anonymous :: maybe(boolean()),
|
|
|
+ default_user :: maybe(list())
|
|
|
}).
|
|
|
|
|
|
-define(TIMER_TABLE, #{
|
|
|
@@ -96,6 +97,10 @@
|
|
|
awaiting_rel_max
|
|
|
]).
|
|
|
|
|
|
+-dialyzer({nowarn_function, [ check_acl/3
|
|
|
+ , init/2
|
|
|
+ ]}).
|
|
|
+
|
|
|
-type(pstate() :: #pstate{}).
|
|
|
|
|
|
%% @doc Init protocol
|
|
|
@@ -417,7 +422,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
|
|
|
{error, dropped, State}
|
|
|
end;
|
|
|
|
|
|
-send(Frame, State = #pstate{sendfun = {Fun, Args}}) ->
|
|
|
+send(Frame, State = #pstate{sendfun = {Fun, Args}}) when is_record(Frame, stomp_frame) ->
|
|
|
?LOG(info, "SEND Frame: ~s", [emqx_stomp_frame:format(Frame)]),
|
|
|
Data = emqx_stomp_frame:serialize(Frame),
|
|
|
?LOG(debug, "SEND ~p", [Data]),
|
|
|
@@ -681,7 +686,7 @@ allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) ->
|
|
|
AllowAnonymous.
|
|
|
|
|
|
ensure_connected(State = #pstate{conninfo = ConnInfo,
|
|
|
- clientinfo = ClientInfo}) ->
|
|
|
+ clientinfo = ClientInfo}) ->
|
|
|
NConnInfo = ConnInfo#{
|
|
|
connected => true,
|
|
|
connected_at => erlang:system_time(millisecond)
|