|
|
@@ -33,8 +33,8 @@
|
|
|
, caps/1
|
|
|
]).
|
|
|
|
|
|
-%% for tests
|
|
|
--export([set/3]).
|
|
|
+%% Exports for unit tests:(
|
|
|
+-export([set_field/3]).
|
|
|
|
|
|
-export([ handle_in/2
|
|
|
, handle_out/2
|
|
|
@@ -45,14 +45,16 @@
|
|
|
, terminate/2
|
|
|
]).
|
|
|
|
|
|
-%% Ensure timer
|
|
|
--export([ensure_timer/2]).
|
|
|
-
|
|
|
--export([gc/3]).
|
|
|
-
|
|
|
--import(emqx_misc, [maybe_apply/2]).
|
|
|
+-export([ received/2
|
|
|
+ , sent/2
|
|
|
+ ]).
|
|
|
|
|
|
--import(emqx_access_control, [check_acl/3]).
|
|
|
+-import(emqx_misc,
|
|
|
+ [ run_fold/2
|
|
|
+ , run_fold/3
|
|
|
+ , pipeline/3
|
|
|
+ , maybe_apply/2
|
|
|
+ ]).
|
|
|
|
|
|
-export_type([channel/0]).
|
|
|
|
|
|
@@ -68,15 +70,14 @@
|
|
|
%% Timers
|
|
|
timers :: #{atom() => disabled | maybe(reference())},
|
|
|
%% GC State
|
|
|
- gc_state :: emqx_gc:gc_state(),
|
|
|
+ gc_state :: maybe(emqx_gc:gc_state()),
|
|
|
%% OOM Policy
|
|
|
- oom_policy :: emqx_oom:oom_policy(),
|
|
|
+ oom_policy :: maybe(emqx_oom:oom_policy()),
|
|
|
%% Connected
|
|
|
- connected :: boolean(),
|
|
|
- %% Disonnected
|
|
|
- disconnected :: boolean(),
|
|
|
+ connected :: undefined | boolean(),
|
|
|
%% Connected at
|
|
|
connected_at :: erlang:timestamp(),
|
|
|
+ %% Disconnected at
|
|
|
disconnected_at :: erlang:timestamp(),
|
|
|
%% Takeover
|
|
|
takeover :: boolean(),
|
|
|
@@ -97,6 +98,10 @@
|
|
|
will_timer => will_message
|
|
|
}).
|
|
|
|
|
|
+-define(ATTR_KEYS, [client, session, protocol, connected, connected_at, disconnected_at]).
|
|
|
+
|
|
|
+-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, gc_state, disconnected_at]).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Init the channel
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -117,22 +122,22 @@ init(ConnInfo, Options) ->
|
|
|
client_id => <<>>,
|
|
|
mountpoint => MountPoint,
|
|
|
is_bridge => false,
|
|
|
- is_superuser => false}, ConnInfo),
|
|
|
+ is_superuser => false
|
|
|
+ }, ConnInfo),
|
|
|
EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
|
|
|
StatsTimer = if
|
|
|
EnableStats -> undefined;
|
|
|
?Otherwise -> disabled
|
|
|
end,
|
|
|
- GcState = emqx_gc:init(emqx_zone:get_env(Zone, force_gc_policy, false)),
|
|
|
- OomPolicy = emqx_oom:init(emqx_zone:get_env(Zone, force_shutdown_policy)),
|
|
|
+ GcState = maybe_apply(fun emqx_gc:init/1,
|
|
|
+ emqx_zone:get_env(Zone, force_gc_policy)),
|
|
|
+ OomPolicy = maybe_apply(fun emqx_oom:init/1,
|
|
|
+ emqx_zone:get_env(Zone, force_shutdown_policy)),
|
|
|
#channel{client = Client,
|
|
|
- session = undefined,
|
|
|
- protocol = undefined,
|
|
|
gc_state = GcState,
|
|
|
oom_policy = OomPolicy,
|
|
|
timers = #{stats_timer => StatsTimer},
|
|
|
- connected = false,
|
|
|
- disconnected = false,
|
|
|
+ connected = undefined,
|
|
|
takeover = false,
|
|
|
resuming = false,
|
|
|
pendings = []
|
|
|
@@ -145,27 +150,14 @@ peer_cert_as_username(Options) ->
|
|
|
%% Info, Attrs and Caps
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+%% @doc Get infos of the channel.
|
|
|
-spec(info(channel()) -> emqx_types:infos()).
|
|
|
-info(#channel{client = Client,
|
|
|
- session = Session,
|
|
|
- protocol = Protocol,
|
|
|
- keepalive = Keepalive,
|
|
|
- gc_state = GCState,
|
|
|
- oom_policy = OomPolicy,
|
|
|
- connected = Connected,
|
|
|
- connected_at = ConnectedAt
|
|
|
- }) ->
|
|
|
- #{client => Client,
|
|
|
- session => maybe_apply(fun emqx_session:info/1, Session),
|
|
|
- protocol => maybe_apply(fun emqx_protocol:info/1, Protocol),
|
|
|
- keepalive => maybe_apply(fun emqx_keepalive:info/1, Keepalive),
|
|
|
- gc_state => emqx_gc:info(GCState),
|
|
|
- oom_policy => emqx_oom:info(OomPolicy),
|
|
|
- connected => Connected,
|
|
|
- connected_at => ConnectedAt
|
|
|
- }.
|
|
|
-
|
|
|
--spec(info(atom(), channel()) -> term()).
|
|
|
+info(Channel) ->
|
|
|
+ maps:from_list(info(?INFO_KEYS, Channel)).
|
|
|
+
|
|
|
+-spec(info(list(atom())|atom(), channel()) -> term()).
|
|
|
+info(Keys, Channel) when is_list(Keys) ->
|
|
|
+ [{Key, info(Key, Channel)} || Key <- Keys];
|
|
|
info(client, #channel{client = Client}) ->
|
|
|
Client;
|
|
|
info(session, #channel{session = Session}) ->
|
|
|
@@ -174,10 +166,10 @@ info(protocol, #channel{protocol = Protocol}) ->
|
|
|
maybe_apply(fun emqx_protocol:info/1, Protocol);
|
|
|
info(keepalive, #channel{keepalive = Keepalive}) ->
|
|
|
maybe_apply(fun emqx_keepalive:info/1, Keepalive);
|
|
|
-info(gc_state, #channel{gc_state = GCState}) ->
|
|
|
- emqx_gc:info(GCState);
|
|
|
-info(oom_policy, #channel{oom_policy = Policy}) ->
|
|
|
- emqx_oom:info(Policy);
|
|
|
+info(gc_state, #channel{gc_state = GcState}) ->
|
|
|
+ maybe_apply(fun emqx_gc:info/1, GcState);
|
|
|
+info(oom_policy, #channel{oom_policy = OomPolicy}) ->
|
|
|
+ maybe_apply(fun emqx_oom:info/1, OomPolicy);
|
|
|
info(connected, #channel{connected = Connected}) ->
|
|
|
Connected;
|
|
|
info(connected_at, #channel{connected_at = ConnectedAt}) ->
|
|
|
@@ -185,20 +177,17 @@ info(connected_at, #channel{connected_at = ConnectedAt}) ->
|
|
|
info(disconnected_at, #channel{disconnected_at = DisconnectedAt}) ->
|
|
|
DisconnectedAt.
|
|
|
|
|
|
+%% @doc Get attrs of the channel.
|
|
|
-spec(attrs(channel()) -> emqx_types:attrs()).
|
|
|
-attrs(#channel{client = Client,
|
|
|
- session = Session,
|
|
|
- protocol = Protocol,
|
|
|
- connected = Connected,
|
|
|
- connected_at = ConnectedAt}) ->
|
|
|
- #{client => Client,
|
|
|
- session => maybe_apply(fun emqx_session:attrs/1, Session),
|
|
|
- protocol => maybe_apply(fun emqx_protocol:attrs/1, Protocol),
|
|
|
- connected => Connected,
|
|
|
- connected_at => ConnectedAt
|
|
|
- }.
|
|
|
-
|
|
|
-%%TODO: ChanStats?
|
|
|
+attrs(Channel) ->
|
|
|
+ maps:from_list([{Key, attr(Key, Channel)} || Key <- ?ATTR_KEYS]).
|
|
|
+
|
|
|
+attr(protocol, #channel{protocol = Proto}) ->
|
|
|
+ maybe_apply(fun emqx_protocol:attrs/1, Proto);
|
|
|
+attr(session, #channel{session = Session}) ->
|
|
|
+ maybe_apply(fun emqx_session:attrs/1, Session);
|
|
|
+attr(Key, Channel) -> info(Key, Channel).
|
|
|
+
|
|
|
-spec(stats(channel()) -> emqx_types:stats()).
|
|
|
stats(#channel{session = Session}) ->
|
|
|
emqx_session:stats(Session).
|
|
|
@@ -207,16 +196,11 @@ stats(#channel{session = Session}) ->
|
|
|
caps(#channel{client = #{zone := Zone}}) ->
|
|
|
emqx_mqtt_caps:get_caps(Zone).
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% For unit tests
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-set(client, Client, Channel) ->
|
|
|
- Channel#channel{client = Client};
|
|
|
-set(session, Session, Channel) ->
|
|
|
- Channel#channel{session = Session};
|
|
|
-set(protocol, Protocol, Channel) ->
|
|
|
- Channel#channel{protocol = Protocol}.
|
|
|
+%% For tests
|
|
|
+set_field(Name, Val, Channel) ->
|
|
|
+ Fields = record_info(fields, channel),
|
|
|
+ Pos = emqx_misc:index_of(Name, Fields),
|
|
|
+ setelement(Pos+1, Channel, Val).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Handle incoming packet
|
|
|
@@ -244,7 +228,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
|
|
handle_out({connack, ReasonCode}, NChannel)
|
|
|
end;
|
|
|
|
|
|
-handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{protocol = Protocol}) ->
|
|
|
+handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId),
|
|
|
+ Channel = #channel{protocol = Protocol}) ->
|
|
|
case pipeline([fun validate_packet/2,
|
|
|
fun process_alias/2,
|
|
|
fun check_publish/2], Packet, Channel) of
|
|
|
@@ -255,41 +240,52 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{p
|
|
|
?LOG(warning, "Cannot publish message to ~s due to ~s",
|
|
|
[Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]),
|
|
|
handle_out({disconnect, ReasonCode}, NChannel)
|
|
|
- % case QoS of
|
|
|
- % ?QOS_0 -> handle_out({puberr, ReasonCode}, NChannel);
|
|
|
- % ?QOS_1 -> handle_out({puback, PacketId, ReasonCode}, NChannel);
|
|
|
- % ?QOS_2 -> handle_out({pubrec, PacketId, ReasonCode}, NChannel)
|
|
|
- % end
|
|
|
end;
|
|
|
|
|
|
-%%TODO: How to handle the ReasonCode?
|
|
|
-handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
|
|
+handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
|
|
+ Channel = #channel{client = Client, session = Session}) ->
|
|
|
case emqx_session:puback(PacketId, Session) of
|
|
|
- {ok, Publishes, NSession} ->
|
|
|
+ {ok, Msg, Publishes, NSession} ->
|
|
|
+ ok = emqx_hooks:run('message.acked', [Client, Msg]),
|
|
|
handle_out({publish, Publishes}, Channel#channel{session = NSession});
|
|
|
- {ok, NSession} ->
|
|
|
+ {ok, Msg, NSession} ->
|
|
|
+ ok = emqx_hooks:run('message.acked', [Client, Msg]),
|
|
|
{ok, Channel#channel{session = NSession}};
|
|
|
- {error, _NotFound} ->
|
|
|
- %%TODO: How to handle NotFound, inc metrics?
|
|
|
+ {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
+ ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
|
|
+ ok = emqx_metrics:inc('packets.puback.inuse'),
|
|
|
+ {ok, Channel};
|
|
|
+ {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
|
+ ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
|
|
|
+ ok = emqx_metrics:inc('packets.puback.missed'),
|
|
|
{ok, Channel}
|
|
|
end;
|
|
|
|
|
|
-%%TODO: How to handle the ReasonCode?
|
|
|
-handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
|
|
+handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
|
|
|
+ Channel = #channel{client = Client, session = Session}) ->
|
|
|
case emqx_session:pubrec(PacketId, Session) of
|
|
|
- {ok, NSession} ->
|
|
|
- handle_out({pubrel, PacketId, ?RC_SUCCESS}, Channel#channel{session = NSession});
|
|
|
- {error, ReasonCode} ->
|
|
|
- handle_out({pubrel, PacketId, ReasonCode}, Channel)
|
|
|
+ {ok, Msg, NSession} ->
|
|
|
+ ok = emqx_hooks:run('message.acked', [Client, Msg]),
|
|
|
+ NChannel = Channel#channel{session = NSession},
|
|
|
+ handle_out({pubrel, PacketId, ?RC_SUCCESS}, NChannel);
|
|
|
+ {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
+ ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
|
|
|
+ ok = emqx_metrics:inc('packets.pubrec.inuse'),
|
|
|
+ handle_out({pubrel, PacketId, RC}, Channel);
|
|
|
+ {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
|
+ ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]),
|
|
|
+ ok = emqx_metrics:inc('packets.pubrec.missed'),
|
|
|
+ handle_out({pubrel, PacketId, RC}, Channel)
|
|
|
end;
|
|
|
|
|
|
-%%TODO: How to handle the ReasonCode?
|
|
|
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
|
|
case emqx_session:pubrel(PacketId, Session) of
|
|
|
{ok, NSession} ->
|
|
|
handle_out({pubcomp, PacketId, ?RC_SUCCESS}, Channel#channel{session = NSession});
|
|
|
- {error, ReasonCode} ->
|
|
|
- handle_out({pubcomp, PacketId, ReasonCode}, Channel)
|
|
|
+ {error, NotFound} ->
|
|
|
+ ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
|
|
|
+ ok = emqx_metrics:inc('packets.pubrel.missed'),
|
|
|
+ handle_out({pubcomp, PacketId, NotFound}, Channel)
|
|
|
end;
|
|
|
|
|
|
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
|
|
@@ -298,36 +294,27 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
|
|
handle_out({publish, Publishes}, Channel#channel{session = NSession});
|
|
|
{ok, NSession} ->
|
|
|
{ok, Channel#channel{session = NSession}};
|
|
|
- {error, _NotFound} ->
|
|
|
- %% TODO: how to handle NotFound?
|
|
|
+ {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
|
|
+ ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
|
|
|
+ ok = emqx_metrics:inc('packets.pubcomp.missed'),
|
|
|
{ok, Channel}
|
|
|
end;
|
|
|
|
|
|
-handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
- Channel = #channel{client = Client}) ->
|
|
|
+handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) ->
|
|
|
case validate_packet(Packet, Channel) of
|
|
|
ok ->
|
|
|
- TopicFilters1 = [emqx_topic:parse(TopicFilter, SubOpts)
|
|
|
- || {TopicFilter, SubOpts} <- TopicFilters],
|
|
|
- TopicFilters2 = emqx_hooks:run_fold('client.subscribe',
|
|
|
- [Client, Properties],
|
|
|
- TopicFilters1),
|
|
|
- TopicFilters3 = enrich_subid(Properties, TopicFilters2),
|
|
|
- {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
|
|
|
+ TopicFilters = preprocess_subscribe(Properties, RawTopicFilters, Channel),
|
|
|
+ {ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel),
|
|
|
handle_out({suback, PacketId, ReasonCodes}, NChannel);
|
|
|
{error, ReasonCode} ->
|
|
|
handle_out({disconnect, ReasonCode}, Channel)
|
|
|
end;
|
|
|
|
|
|
-handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
- Channel = #channel{client = Client}) ->
|
|
|
+handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) ->
|
|
|
case validate_packet(Packet, Channel) of
|
|
|
ok ->
|
|
|
- TopicFilters1 = lists:map(fun emqx_topic:parse/1, TopicFilters),
|
|
|
- TopicFilters2 = emqx_hooks:run_fold('client.unsubscribe',
|
|
|
- [Client, Properties],
|
|
|
- TopicFilters1),
|
|
|
- {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters2, Channel),
|
|
|
+ TopicFilters = preprocess_unsubscribe(Properties, RawTopicFilters, Channel),
|
|
|
+ {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel),
|
|
|
handle_out({unsuback, PacketId, ReasonCodes}, NChannel);
|
|
|
{error, ReasonCode} ->
|
|
|
handle_out({disconnect, ReasonCode}, Channel)
|
|
|
@@ -347,7 +334,7 @@ handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Sessi
|
|
|
?RC_SUCCESS -> Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)};
|
|
|
_ -> Channel
|
|
|
end,
|
|
|
- Channel2 = ensure_disconnected(Channel1#channel{session = emqx_session:update_expiry_interval(Interval, Session)}),
|
|
|
+ Channel2 = Channel1#channel{session = emqx_session:update_expiry_interval(Interval, Session)},
|
|
|
case Interval of
|
|
|
?UINT_MAX ->
|
|
|
{ok, ensure_timer(will_timer, Channel2)};
|
|
|
@@ -382,6 +369,7 @@ process_connect(ConnPkt, Channel) ->
|
|
|
NChannel = Channel#channel{session = Session},
|
|
|
handle_out({connack, ?RC_SUCCESS, sp(false)}, NChannel);
|
|
|
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
|
|
+ %%TODO: improve later.
|
|
|
NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
|
|
NChannel = Channel#channel{session = Session,
|
|
|
resuming = true,
|
|
|
@@ -397,39 +385,58 @@ process_connect(ConnPkt, Channel) ->
|
|
|
%% Process Publish
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-%% Process Publish
|
|
|
-process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId),
|
|
|
- Channel = #channel{client = Client, protocol = Protocol}) ->
|
|
|
- Msg = emqx_packet:to_message(Client, Packet),
|
|
|
- %%TODO: Improve later.
|
|
|
- Msg1 = emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, emqx_protocol:info(proto_ver, Protocol), Msg)),
|
|
|
- process_publish(PacketId, mount(Client, Msg1), Channel).
|
|
|
+process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) ->
|
|
|
+ Msg = publish_to_msg(Packet, Channel),
|
|
|
+ process_publish(PacketId, Msg, Channel).
|
|
|
|
|
|
process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
|
|
|
_ = emqx_broker:publish(Msg),
|
|
|
{ok, Channel};
|
|
|
|
|
|
process_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
|
|
|
- Deliveries = emqx_broker:publish(Msg),
|
|
|
- ReasonCode = emqx_reason_codes:puback(Deliveries),
|
|
|
+ ReasonCode = case emqx_broker:publish(Msg) of
|
|
|
+ [] -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
|
|
+ _ -> ?RC_SUCCESS
|
|
|
+ end,
|
|
|
handle_out({puback, PacketId, ReasonCode}, Channel);
|
|
|
|
|
|
process_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|
|
Channel = #channel{session = Session}) ->
|
|
|
case emqx_session:publish(PacketId, Msg, Session) of
|
|
|
- {ok, Deliveries, NSession} ->
|
|
|
- ReasonCode = emqx_reason_codes:puback(Deliveries),
|
|
|
+ {ok, Results, NSession} ->
|
|
|
+ RC = case Results of
|
|
|
+ [] -> ?RC_NO_MATCHING_SUBSCRIBERS;
|
|
|
+ _ -> ?RC_SUCCESS
|
|
|
+ end,
|
|
|
NChannel = Channel#channel{session = NSession},
|
|
|
- handle_out({pubrec, PacketId, ReasonCode},
|
|
|
- ensure_timer(await_timer, NChannel));
|
|
|
- {error, ReasonCode} ->
|
|
|
- handle_out({pubrec, PacketId, ReasonCode}, Channel)
|
|
|
+ handle_out({pubrec, PacketId, RC}, ensure_timer(await_timer, NChannel));
|
|
|
+ {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
|
|
+ ok = emqx_metrics:inc('packets.publish.inuse'),
|
|
|
+ handle_out({pubrec, PacketId, RC}, Channel);
|
|
|
+ {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
|
|
|
+ ?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full", [PacketId]),
|
|
|
+ ok = emqx_metrics:inc('messages.qos2.dropped'),
|
|
|
+ handle_out({pubrec, PacketId, RC}, Channel)
|
|
|
end.
|
|
|
|
|
|
+publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}}) ->
|
|
|
+ Msg = emqx_packet:to_message(Client, Packet),
|
|
|
+ Msg1 = emqx_message:set_flag(dup, false, Msg),
|
|
|
+ emqx_mountpoint:mount(MountPoint, Msg1).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process Subscribe
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+-compile({inline, [preprocess_subscribe/3]}).
|
|
|
+preprocess_subscribe(Properties, RawTopicFilters, #channel{client = Client}) ->
|
|
|
+ RunHook = fun(TopicFilters) ->
|
|
|
+ emqx_hooks:run_fold('client.subscribe',
|
|
|
+ [Client, Properties], TopicFilters)
|
|
|
+ end,
|
|
|
+ Enrich = fun(TopicFilters) -> enrich_subid(Properties, TopicFilters) end,
|
|
|
+ run_fold([fun parse_topic_filters/1, RunHook, Enrich], RawTopicFilters).
|
|
|
+
|
|
|
process_subscribe(TopicFilters, Channel) ->
|
|
|
process_subscribe(TopicFilters, [], Channel).
|
|
|
|
|
|
@@ -440,16 +447,18 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
{RC, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel),
|
|
|
process_subscribe(More, [RC|Acc], NChannel).
|
|
|
|
|
|
-do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
|
|
|
- Channel = #channel{client = Client, session = Session}) ->
|
|
|
+do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
|
+ #channel{client = Client = #{mountpoint := MountPoint},
|
|
|
+ session = Session}) ->
|
|
|
case check_subscribe(TopicFilter, SubOpts, Channel) of
|
|
|
- ok -> TopicFilter1 = mount(Client, TopicFilter),
|
|
|
- SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
|
|
- case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
|
|
|
- {ok, NSession} ->
|
|
|
- {QoS, Channel#channel{session = NSession}};
|
|
|
- {error, RC} -> {RC, Channel}
|
|
|
- end;
|
|
|
+ ok ->
|
|
|
+ TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
+ SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
|
|
+ case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
|
|
|
+ {ok, NSession} ->
|
|
|
+ {QoS, Channel#channel{session = NSession}};
|
|
|
+ {error, RC} -> {RC, Channel}
|
|
|
+ end;
|
|
|
{error, RC} -> {RC, Channel}
|
|
|
end.
|
|
|
|
|
|
@@ -457,6 +466,15 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
|
|
|
%% Process Unsubscribe
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+-compile({inline, [preprocess_unsubscribe/3]}).
|
|
|
+preprocess_unsubscribe(Properties, RawTopicFilter, #channel{client = Client}) ->
|
|
|
+ RunHook = fun(TopicFilters) ->
|
|
|
+ emqx_hooks:run_fold('client.unsubscribe',
|
|
|
+ [Client, Properties], TopicFilters)
|
|
|
+ end,
|
|
|
+ run_fold([fun parse_topic_filters/1, RunHook], RawTopicFilter).
|
|
|
+
|
|
|
+-compile({inline, [process_unsubscribe/2]}).
|
|
|
process_unsubscribe(TopicFilters, Channel) ->
|
|
|
process_unsubscribe(TopicFilters, [], Channel).
|
|
|
|
|
|
@@ -464,12 +482,14 @@ process_unsubscribe([], Acc, Channel) ->
|
|
|
{lists:reverse(Acc), Channel};
|
|
|
|
|
|
process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
- {RC, Channel1} = do_unsubscribe(TopicFilter, SubOpts, Channel),
|
|
|
- process_unsubscribe(More, [RC|Acc], Channel1).
|
|
|
-
|
|
|
-do_unsubscribe(TopicFilter, _SubOpts,
|
|
|
- Channel = #channel{client = Client, session = Session}) ->
|
|
|
- case emqx_session:unsubscribe(Client, mount(Client, TopicFilter), Session) of
|
|
|
+ {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts, Channel),
|
|
|
+ process_unsubscribe(More, [RC|Acc], NChannel).
|
|
|
+
|
|
|
+do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
|
+ #channel{client = Client = #{mountpoint := MountPoint},
|
|
|
+ session = Session}) ->
|
|
|
+ TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
+ case emqx_session:unsubscribe(Client, TopicFilter1, Session) of
|
|
|
{ok, NSession} ->
|
|
|
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
|
|
{error, RC} -> {RC, Channel}
|
|
|
@@ -479,15 +499,15 @@ do_unsubscribe(TopicFilter, _SubOpts,
|
|
|
%% Handle outgoing packet
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+%%TODO: RunFold or Pipeline
|
|
|
handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) ->
|
|
|
- ok = emqx_hooks:run('client.connected',
|
|
|
- [Client, ?RC_SUCCESS, attrs(Channel)]),
|
|
|
- AckProps = emqx_misc:run_fold([fun enrich_caps/2,
|
|
|
- fun enrich_server_keepalive/2,
|
|
|
- fun enrich_assigned_clientid/2
|
|
|
- ], #{}, Channel),
|
|
|
- AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
|
|
|
+ AckProps = run_fold([fun enrich_caps/2,
|
|
|
+ fun enrich_server_keepalive/2,
|
|
|
+ fun enrich_assigned_clientid/2
|
|
|
+ ], #{}, Channel),
|
|
|
Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)),
|
|
|
+ ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(Channel1)]),
|
|
|
+ AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
|
|
|
case maybe_resume_session(Channel1) of
|
|
|
ignore -> {ok, AckPacket, Channel1};
|
|
|
{ok, Publishes, NSession} ->
|
|
|
@@ -502,7 +522,10 @@ handle_out({connack, ReasonCode}, Channel = #channel{client = Client,
|
|
|
protocol = Protocol
|
|
|
}) ->
|
|
|
ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(Channel)]),
|
|
|
- ProtoVer = emqx_protocol:info(proto_ver, Protocol),
|
|
|
+ ProtoVer = case Protocol of
|
|
|
+ undefined -> undefined;
|
|
|
+ _ -> emqx_protocol:info(proto_ver, Protocol)
|
|
|
+ end,
|
|
|
ReasonCode1 = if
|
|
|
ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode;
|
|
|
true -> emqx_reason_codes:compat(connack, ReasonCode)
|
|
|
@@ -510,9 +533,10 @@ handle_out({connack, ReasonCode}, Channel = #channel{client = Client,
|
|
|
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
|
|
|
{stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel};
|
|
|
|
|
|
-handle_out({deliver, Delivers}, Channel = #channel{session = Session,
|
|
|
+handle_out({deliver, Delivers}, Channel = #channel{session = Session,
|
|
|
connected = false}) ->
|
|
|
- {ok, Channel#channel{session = emqx_session:enqueue(Delivers, Session)}};
|
|
|
+ NSession = emqx_session:enqueue(Delivers, Session),
|
|
|
+ {ok, Channel#channel{session = NSession}};
|
|
|
|
|
|
handle_out({deliver, Delivers}, Channel = #channel{takeover = true,
|
|
|
pendings = Pendings}) ->
|
|
|
@@ -527,23 +551,33 @@ handle_out({deliver, Delivers}, Channel = #channel{session = Session}) ->
|
|
|
{ok, Channel#channel{session = NSession}}
|
|
|
end;
|
|
|
|
|
|
-handle_out({publish, Publishes}, Channel) ->
|
|
|
- Packets = lists:map(
|
|
|
- fun(Publish) ->
|
|
|
- element(2, handle_out(Publish, Channel))
|
|
|
- end, Publishes),
|
|
|
- {ok, Packets, Channel};
|
|
|
-
|
|
|
-handle_out({publish, PacketId, Msg}, Channel = #channel{client = Client}) ->
|
|
|
- Msg1 = emqx_hooks:run_fold('message.deliver', [Client],
|
|
|
- emqx_message:update_expiry(Msg)),
|
|
|
- Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)),
|
|
|
- {ok, Packet, Channel};
|
|
|
-
|
|
|
-%% TODO: How to handle the puberr?
|
|
|
-handle_out({puberr, _ReasonCode}, Channel) ->
|
|
|
+handle_out({publish, [Publish]}, Channel) ->
|
|
|
+ handle_out(Publish, Channel);
|
|
|
+
|
|
|
+handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
|
|
|
+ Packets = lists:foldl(
|
|
|
+ fun(Publish, Acc) ->
|
|
|
+ case handle_out(Publish, Channel) of
|
|
|
+ {ok, Packet, _Ch} ->
|
|
|
+ [Packet|Acc];
|
|
|
+ {ok, _Ch} -> Acc
|
|
|
+ end
|
|
|
+ end, [], Publishes),
|
|
|
+ {ok, lists:reverse(Packets), Channel};
|
|
|
+
|
|
|
+%% Ignore loop deliver
|
|
|
+handle_out({publish, _PacketId, #message{from = ClientId,
|
|
|
+ flags = #{nl := true}}},
|
|
|
+ Channel = #channel{client = #{client_id := ClientId}}) ->
|
|
|
{ok, Channel};
|
|
|
|
|
|
+handle_out({publish, PacketId, Msg}, Channel =
|
|
|
+ #channel{client = Client = #{mountpoint := MountPoint}}) ->
|
|
|
+ Msg1 = emqx_message:update_expiry(Msg),
|
|
|
+ Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1),
|
|
|
+ Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
|
|
|
+ {ok, emqx_packet:from_message(PacketId, Msg3), Channel};
|
|
|
+
|
|
|
handle_out({puback, PacketId, ReasonCode}, Channel) ->
|
|
|
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
|
|
|
|
|
|
@@ -556,25 +590,21 @@ handle_out({pubrec, PacketId, ReasonCode}, Channel) ->
|
|
|
handle_out({pubcomp, PacketId, ReasonCode}, Channel) ->
|
|
|
{ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
|
|
|
|
|
|
-handle_out({suback, PacketId, ReasonCodes},
|
|
|
- Channel = #channel{protocol = Protocol}) ->
|
|
|
- ReasonCodes1 =
|
|
|
- case emqx_protocol:info(proto_ver, Protocol) of
|
|
|
- ?MQTT_PROTO_V5 -> ReasonCodes;
|
|
|
- _Ver ->
|
|
|
- [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes]
|
|
|
- end,
|
|
|
+handle_out({suback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) ->
|
|
|
+ ReasonCodes1 = case emqx_protocol:info(proto_ver, Protocol) of
|
|
|
+ ?MQTT_PROTO_V5 -> ReasonCodes;
|
|
|
+ _Ver ->
|
|
|
+ [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes]
|
|
|
+ end,
|
|
|
{ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel};
|
|
|
|
|
|
-handle_out({unsuback, PacketId, ReasonCodes},
|
|
|
- Channel = #channel{protocol = Protocol}) ->
|
|
|
- Packet = case emqx_protocol:info(proto_ver, Protocol) of
|
|
|
- ?MQTT_PROTO_V5 ->
|
|
|
- ?UNSUBACK_PACKET(PacketId, ReasonCodes);
|
|
|
- %% Ignore reason codes if not MQTT5
|
|
|
- _Ver -> ?UNSUBACK_PACKET(PacketId)
|
|
|
- end,
|
|
|
- {ok, Packet, Channel};
|
|
|
+handle_out({unsuback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) ->
|
|
|
+ Unsuback = case emqx_protocol:info(proto_ver, Protocol) of
|
|
|
+ ?MQTT_PROTO_V5 ->
|
|
|
+ ?UNSUBACK_PACKET(PacketId, ReasonCodes);
|
|
|
+ _Ver -> ?UNSUBACK_PACKET(PacketId)
|
|
|
+ end,
|
|
|
+ {ok, Unsuback, Channel};
|
|
|
|
|
|
handle_out({disconnect, ReasonCode}, Channel = #channel{protocol = Protocol}) ->
|
|
|
case emqx_protocol:info(proto_ver, Protocol) of
|
|
|
@@ -595,6 +625,12 @@ handle_out({Type, Data}, Channel) ->
|
|
|
%% Handle call
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+handle_call(kick, Channel) ->
|
|
|
+ {stop, {shutdown, kicked}, ok, Channel};
|
|
|
+
|
|
|
+handle_call(discard, Channel) ->
|
|
|
+ {stop, {shutdown, discarded}, ok, Channel};
|
|
|
+
|
|
|
%% Session Takeover
|
|
|
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
|
|
|
{ok, Session, Channel#channel{takeover = true}};
|
|
|
@@ -613,6 +649,13 @@ handle_call(Req, Channel) ->
|
|
|
%% Handle cast
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+-spec(handle_cast(Msg :: term(), channel())
|
|
|
+ -> ok | {ok, channel()} | {stop, Reason :: term(), channel()}).
|
|
|
+handle_cast({register, Attrs, Stats}, #channel{client = #{client_id := ClientId}}) ->
|
|
|
+ ok = emqx_cm:register_channel(ClientId),
|
|
|
+ emqx_cm:set_chan_attrs(ClientId, Attrs),
|
|
|
+ emqx_cm:set_chan_stats(ClientId, Stats);
|
|
|
+
|
|
|
handle_cast(Msg, Channel) ->
|
|
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
|
{ok, Channel}.
|
|
|
@@ -623,26 +666,24 @@ handle_cast(Msg, Channel) ->
|
|
|
|
|
|
-spec(handle_info(Info :: term(), channel())
|
|
|
-> {ok, channel()} | {stop, Reason :: term(), channel()}).
|
|
|
-handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) ->
|
|
|
- TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
|
|
- [Client, #{'Internal' => true}],
|
|
|
- parse(subscribe, TopicFilters)),
|
|
|
- {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
|
|
+handle_info({subscribe, RawTopicFilters}, Channel) ->
|
|
|
+ TopicFilters = preprocess_subscribe(#{'Internal' => true},
|
|
|
+ RawTopicFilters, Channel),
|
|
|
+ {_ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
-handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
|
|
|
- TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
|
|
- [Client, #{'Internal' => true}],
|
|
|
- parse(unsubscribe, TopicFilters)),
|
|
|
- {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
|
|
+handle_info({unsubscribe, RawTopicFilters}, Channel) ->
|
|
|
+ TopicFilters = preprocess_unsubscribe(#{'Internal' => true},
|
|
|
+ RawTopicFilters, Channel),
|
|
|
+ {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
-handle_info(sock_closed, Channel = #channel{disconnected = true}) ->
|
|
|
- {ok, Channel};
|
|
|
-handle_info(sock_closed, Channel = #channel{connected = false}) ->
|
|
|
+handle_info(disconnected, Channel = #channel{connected = undefined}) ->
|
|
|
shutdown(closed, Channel);
|
|
|
-handle_info(sock_closed, Channel = #channel{protocol = Protocol,
|
|
|
- session = Session}) ->
|
|
|
+
|
|
|
+handle_info(disconnected, Channel = #channel{protocol = Protocol,
|
|
|
+ session = Session}) ->
|
|
|
+ %% TODO: Why handle will_msg here?
|
|
|
publish_will_msg(emqx_protocol:info(will_msg, Protocol)),
|
|
|
NChannel = Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)},
|
|
|
Interval = emqx_session:info(expiry_interval, Session),
|
|
|
@@ -779,23 +820,20 @@ terminate(Reason, #channel{client = Client,
|
|
|
true -> publish_will_msg(emqx_protocol:info(will_msg, Protocol))
|
|
|
end.
|
|
|
|
|
|
+-spec(received(pos_integer(), channel()) -> channel()).
|
|
|
+received(Oct, Channel) ->
|
|
|
+ ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)).
|
|
|
+
|
|
|
+-spec(sent(pos_integer(), channel()) -> channel()).
|
|
|
+sent(Oct, Channel) ->
|
|
|
+ ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)).
|
|
|
+
|
|
|
%%TODO: Improve will msg:)
|
|
|
publish_will_msg(undefined) ->
|
|
|
ok;
|
|
|
publish_will_msg(Msg) ->
|
|
|
emqx_broker:publish(Msg).
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% GC the channel.
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-gc(_Cnt, _Oct, Channel = #channel{gc_state = undefined}) ->
|
|
|
- Channel;
|
|
|
-gc(Cnt, Oct, Channel = #channel{gc_state = GCSt}) ->
|
|
|
- {Ok, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt),
|
|
|
- Ok andalso emqx_metrics:inc('channel.gc.cnt'),
|
|
|
- Channel#channel{gc_state = GCSt1}.
|
|
|
-
|
|
|
%% @doc Validate incoming packet.
|
|
|
-spec(validate_packet(emqx_types:packet(), channel())
|
|
|
-> ok | {error, emqx_types:reason_code()}).
|
|
|
@@ -899,42 +937,38 @@ init_protocol(ConnPkt, Channel) ->
|
|
|
%% Enrich client
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-enrich_client(ConnPkt, Channel) ->
|
|
|
- pipeline([fun set_username/2,
|
|
|
- fun maybe_use_username_as_clientid/2,
|
|
|
- fun maybe_assign_clientid/2,
|
|
|
- fun set_rest_client_fields/2], ConnPkt, Channel).
|
|
|
-
|
|
|
-maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = #{username := undefined}}) ->
|
|
|
- {ok, Channel};
|
|
|
-maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = Client = #{zone := Zone,
|
|
|
- username := Username}}) ->
|
|
|
- NClient =
|
|
|
- case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
|
|
|
- true -> Client#{client_id => Username};
|
|
|
- false -> Client
|
|
|
- end,
|
|
|
- {ok, Channel#channel{client = NClient}}.
|
|
|
+enrich_client(ConnPkt = #mqtt_packet_connect{is_bridge = IsBridge},
|
|
|
+ Channel = #channel{client = Client}) ->
|
|
|
+ {ok, NConnPkt, NClient} = pipeline([fun set_username/2,
|
|
|
+ fun maybe_username_as_clientid/2,
|
|
|
+ fun maybe_assign_clientid/2,
|
|
|
+ fun fix_mountpoint/2
|
|
|
+ ], ConnPkt, Client),
|
|
|
+ {ok, NConnPkt, Channel#channel{client = NClient#{is_bridge => IsBridge}}}.
|
|
|
+
|
|
|
+%% Username may be not undefined if peer_cert_as_username
|
|
|
+set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) ->
|
|
|
+ {ok, Client#{username => Username}};
|
|
|
+set_username(_ConnPkt, Client) ->
|
|
|
+ {ok, Client}.
|
|
|
+
|
|
|
+maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) ->
|
|
|
+ {ok, Client};
|
|
|
+maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) ->
|
|
|
+ case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
|
|
|
+ true -> {ok, Client#{client_id => Username}};
|
|
|
+ false -> ok
|
|
|
+ end.
|
|
|
|
|
|
-maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>},
|
|
|
- Channel = #channel{client = Client}) ->
|
|
|
+maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) ->
|
|
|
RandClientId = emqx_guid:to_base62(emqx_guid:gen()),
|
|
|
- {ok, Channel#channel{client = Client#{client_id => RandClientId}}};
|
|
|
-
|
|
|
-maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId},
|
|
|
- Channel = #channel{client = Client}) ->
|
|
|
- {ok, Channel#channel{client = Client#{client_id => ClientId}}}.
|
|
|
-
|
|
|
-%% Username maybe not undefined if peer_cert_as_username
|
|
|
-set_username(#mqtt_packet_connect{username = Username},
|
|
|
- Channel = #channel{client = Client = #{username := undefined}}) ->
|
|
|
- {ok, Channel#channel{client = Client#{username => Username}}};
|
|
|
-set_username(_ConnPkt, Channel) ->
|
|
|
- {ok, Channel}.
|
|
|
+ {ok, Client#{client_id => RandClientId}};
|
|
|
+maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) ->
|
|
|
+ {ok, Client#{client_id => ClientId}}.
|
|
|
|
|
|
-set_rest_client_fields(#mqtt_packet_connect{is_bridge = IsBridge},
|
|
|
- Channel = #channel{client = Client}) ->
|
|
|
- {ok, Channel#channel{client = Client#{is_bridge => IsBridge}}}.
|
|
|
+fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) -> ok;
|
|
|
+fix_mountpoint(_ConnPkt, Client = #{mountpoint := Mountpoint}) ->
|
|
|
+ {ok, Client#{mountpoint := emqx_mountpoint:replvar(Mountpoint, Client)}}.
|
|
|
|
|
|
%% @doc Set logger metadata.
|
|
|
set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) ->
|
|
|
@@ -1016,7 +1050,8 @@ check_publish(Packet, Channel) ->
|
|
|
%% Check Pub ACL
|
|
|
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
|
|
#channel{client = Client}) ->
|
|
|
- case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of
|
|
|
+ case is_acl_enabled(Client) andalso
|
|
|
+ emqx_access_control:check_acl(Client, publish, Topic) of
|
|
|
false -> ok;
|
|
|
allow -> ok;
|
|
|
deny -> {error, ?RC_NOT_AUTHORIZED}
|
|
|
@@ -1057,7 +1092,7 @@ check_subscribe(TopicFilter, SubOpts, Channel) ->
|
|
|
%% Check Sub ACL
|
|
|
check_sub_acl(TopicFilter, #channel{client = Client}) ->
|
|
|
case is_acl_enabled(Client) andalso
|
|
|
- check_acl(Client, subscribe, TopicFilter) of
|
|
|
+ emqx_access_control:check_acl(Client, subscribe, TopicFilter) of
|
|
|
false -> allow;
|
|
|
Result -> Result
|
|
|
end.
|
|
|
@@ -1116,10 +1151,10 @@ enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId},
|
|
|
end.
|
|
|
|
|
|
ensure_connected(Channel) ->
|
|
|
- Channel#channel{connected = true, connected_at = os:timestamp(), disconnected = false}.
|
|
|
+ Channel#channel{connected = true, connected_at = os:timestamp(), disconnected_at = undefined}.
|
|
|
|
|
|
ensure_disconnected(Channel) ->
|
|
|
- Channel#channel{connected = false, disconnected_at = os:timestamp(), disconnected = true}.
|
|
|
+ Channel#channel{connected = false, disconnected_at = os:timestamp()}.
|
|
|
|
|
|
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
|
|
ensure_keepalive_timer(Interval, Channel);
|
|
|
@@ -1147,53 +1182,32 @@ maybe_resume_session(#channel{session = Session,
|
|
|
{ok, lists:append(Publishes, More), Session2}
|
|
|
end.
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Is ACL enabled?
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
+%% @doc Is ACL enabled?
|
|
|
is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
|
|
|
(not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true).
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Parse Topic Filters
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-parse(subscribe, TopicFilters) ->
|
|
|
- [emqx_topic:parse(TopicFilter, SubOpts) || {TopicFilter, SubOpts} <- TopicFilters];
|
|
|
-
|
|
|
-parse(unsubscribe, TopicFilters) ->
|
|
|
+%% @doc Parse Topic Filters
|
|
|
+-compile({inline, [parse_topic_filters/1]}).
|
|
|
+parse_topic_filters(TopicFilters) ->
|
|
|
lists:map(fun emqx_topic:parse/1, TopicFilters).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% Mount/Unmount
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-mount(Client = #{mountpoint := MountPoint}, TopicOrMsg) ->
|
|
|
- emqx_mountpoint:mount(
|
|
|
- emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg).
|
|
|
-
|
|
|
-unmount(Client = #{mountpoint := MountPoint}, TopicOrMsg) ->
|
|
|
- emqx_mountpoint:unmount(
|
|
|
- emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Pipeline
|
|
|
+%% Maybe GC and Check OOM
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-pipeline([], Packet, Channel) ->
|
|
|
- {ok, Packet, Channel};
|
|
|
+maybe_gc_and_check_oom(_Oct, Channel = #channel{gc_state = undefined}) ->
|
|
|
+ Channel;
|
|
|
+maybe_gc_and_check_oom(Oct, Channel = #channel{gc_state = GCSt,
|
|
|
+ oom_policy = OomPolicy}) ->
|
|
|
+ {IsGC, GCSt1} = emqx_gc:run(1, Oct, GCSt),
|
|
|
+ IsGC andalso emqx_metrics:inc('channel.gc.cnt'),
|
|
|
+ IsGC andalso maybe_apply(fun check_oom/1, OomPolicy),
|
|
|
+ Channel#channel{gc_state = GCSt1}.
|
|
|
|
|
|
-pipeline([Fun|More], Packet, Channel) ->
|
|
|
- case Fun(Packet, Channel) of
|
|
|
- ok -> pipeline(More, Packet, Channel);
|
|
|
- {ok, NChannel} ->
|
|
|
- pipeline(More, Packet, NChannel);
|
|
|
- {ok, NPacket, NChannel} ->
|
|
|
- pipeline(More, NPacket, NChannel);
|
|
|
- {error, ReasonCode} ->
|
|
|
- {error, ReasonCode, Channel};
|
|
|
- {error, ReasonCode, NChannel} ->
|
|
|
- {error, ReasonCode, NChannel}
|
|
|
+check_oom(OomPolicy) ->
|
|
|
+ case emqx_oom:check(OomPolicy) of
|
|
|
+ ok -> ok;
|
|
|
+ Shutdown -> self() ! Shutdown
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|