|
@@ -60,7 +60,7 @@
|
|
|
%% MQTT ConnInfo
|
|
%% MQTT ConnInfo
|
|
|
conninfo :: emqx_types:conninfo(),
|
|
conninfo :: emqx_types:conninfo(),
|
|
|
%% MQTT ClientInfo
|
|
%% MQTT ClientInfo
|
|
|
- client :: emqx_types:client(),
|
|
|
|
|
|
|
+ client_info :: emqx_types:client_info(),
|
|
|
%% MQTT Session
|
|
%% MQTT Session
|
|
|
session :: emqx_session:session(),
|
|
session :: emqx_session:session(),
|
|
|
%% Keepalive
|
|
%% Keepalive
|
|
@@ -102,7 +102,7 @@
|
|
|
will_timer => will_message
|
|
will_timer => will_message
|
|
|
}).
|
|
}).
|
|
|
|
|
|
|
|
--define(ATTR_KEYS, [conninfo, client, session, connected, connected_at, disconnected_at]).
|
|
|
|
|
|
|
+-define(ATTR_KEYS, [conninfo, client_info, session, connected, connected_at, disconnected_at]).
|
|
|
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, topic_aliases, alias_maximum, gc_state, disconnected_at]).
|
|
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, topic_aliases, alias_maximum, gc_state, disconnected_at]).
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
@@ -119,7 +119,7 @@ info(Keys, Channel) when is_list(Keys) ->
|
|
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
|
[{Key, info(Key, Channel)} || Key <- Keys];
|
|
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
|
info(conninfo, #channel{conninfo = ConnInfo}) ->
|
|
|
ConnInfo;
|
|
ConnInfo;
|
|
|
-info(client, #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+info(client_info, #channel{client_info = ClientInfo}) ->
|
|
|
ClientInfo;
|
|
ClientInfo;
|
|
|
info(session, #channel{session = Session}) ->
|
|
info(session, #channel{session = Session}) ->
|
|
|
maybe_apply(fun emqx_session:info/1, Session);
|
|
maybe_apply(fun emqx_session:info/1, Session);
|
|
@@ -158,7 +158,7 @@ stats(#channel{session = Session}) ->
|
|
|
emqx_session:stats(Session).
|
|
emqx_session:stats(Session).
|
|
|
|
|
|
|
|
-spec(caps(channel()) -> emqx_types:caps()).
|
|
-spec(caps(channel()) -> emqx_types:caps()).
|
|
|
-caps(#channel{client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+caps(#channel{client_info = #{zone := Zone}}) ->
|
|
|
emqx_mqtt_caps:get_caps(Zone).
|
|
emqx_mqtt_caps:get_caps(Zone).
|
|
|
|
|
|
|
|
%% For tests
|
|
%% For tests
|
|
@@ -196,15 +196,15 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, protocol := Protocol}, Options)
|
|
|
true -> undefined;
|
|
true -> undefined;
|
|
|
false -> disabled
|
|
false -> disabled
|
|
|
end,
|
|
end,
|
|
|
- #channel{conninfo = ConnInfo,
|
|
|
|
|
- client = ClientInfo,
|
|
|
|
|
- gc_state = init_gc_state(Zone),
|
|
|
|
|
- oom_policy = init_oom_policy(Zone),
|
|
|
|
|
- timers = #{stats_timer => StatsTimer},
|
|
|
|
|
- connected = undefined,
|
|
|
|
|
- takeover = false,
|
|
|
|
|
- resuming = false,
|
|
|
|
|
- pendings = []
|
|
|
|
|
|
|
+ #channel{conninfo = ConnInfo,
|
|
|
|
|
+ client_info = ClientInfo,
|
|
|
|
|
+ gc_state = init_gc_state(Zone),
|
|
|
|
|
+ oom_policy = init_oom_policy(Zone),
|
|
|
|
|
+ timers = #{stats_timer => StatsTimer},
|
|
|
|
|
+ connected = undefined,
|
|
|
|
|
+ takeover = false,
|
|
|
|
|
+ resuming = false,
|
|
|
|
|
+ pendings = []
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
peer_cert_as_username(Options) ->
|
|
peer_cert_as_username(Options) ->
|
|
@@ -252,7 +252,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
|
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
|
|
- Channel = #channel{client = ClientInfo, session = Session}) ->
|
|
|
|
|
|
|
+ Channel = #channel{client_info = ClientInfo, session = Session}) ->
|
|
|
case emqx_session:puback(PacketId, Session) of
|
|
case emqx_session:puback(PacketId, Session) of
|
|
|
{ok, Msg, Publishes, NSession} ->
|
|
{ok, Msg, Publishes, NSession} ->
|
|
|
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
|
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
|
@@ -271,7 +271,7 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode),
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
|
|
handle_in(?PUBREC_PACKET(PacketId, _ReasonCode),
|
|
|
- Channel = #channel{client = ClientInfo, session = Session}) ->
|
|
|
|
|
|
|
+ Channel = #channel{client_info = ClientInfo, session = Session}) ->
|
|
|
case emqx_session:pubrec(PacketId, Session) of
|
|
case emqx_session:pubrec(PacketId, Session) of
|
|
|
{ok, Msg, NSession} ->
|
|
{ok, Msg, NSession} ->
|
|
|
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
|
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
|
|
@@ -310,7 +310,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
- Channel = #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+ Channel = #channel{client_info = ClientInfo}) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
case emqx_packet:check(Packet) of
|
|
|
ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
|
ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
|
|
[ClientInfo, Properties],
|
|
[ClientInfo, Properties],
|
|
@@ -323,7 +323,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
- Channel = #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+ Channel = #channel{client_info = ClientInfo}) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
case emqx_packet:check(Packet) of
|
|
|
ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
|
ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
|
|
[ClientInfo, Properties],
|
|
[ClientInfo, Properties],
|
|
@@ -369,7 +369,7 @@ handle_in(Packet, Channel) ->
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
|
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
|
|
- Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) ->
|
|
|
|
|
|
|
+ Channel = #channel{conninfo = ConnInfo, client_info = ClientInfo}) ->
|
|
|
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
|
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
|
|
{ok, #{session := Session, present := false}} ->
|
|
{ok, #{session := Session, present := false}} ->
|
|
|
NChannel = Channel#channel{session = Session},
|
|
NChannel = Channel#channel{session = Session},
|
|
@@ -440,7 +440,7 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
|
|
publish_to_msg(Packet, #channel{conninfo = #{proto_ver := ProtoVer},
|
|
|
- client = ClientInfo = #{mountpoint := MountPoint}}) ->
|
|
|
|
|
|
|
+ client_info = ClientInfo = #{mountpoint := MountPoint}}) ->
|
|
|
Msg = emqx_packet:to_message(ClientInfo, Packet),
|
|
Msg = emqx_packet:to_message(ClientInfo, Packet),
|
|
|
Msg1 = emqx_message:set_flag(dup, false, Msg),
|
|
Msg1 = emqx_message:set_flag(dup, false, Msg),
|
|
|
Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1),
|
|
Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1),
|
|
@@ -461,7 +461,7 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
process_subscribe(More, [RC|Acc], NChannel).
|
|
process_subscribe(More, [RC|Acc], NChannel).
|
|
|
|
|
|
|
|
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
|
|
- #channel{client = ClientInfo = #{mountpoint := MountPoint},
|
|
|
|
|
|
|
+ #channel{client_info = ClientInfo = #{mountpoint := MountPoint},
|
|
|
session = Session}) ->
|
|
session = Session}) ->
|
|
|
case check_subscribe(TopicFilter, SubOpts, Channel) of
|
|
case check_subscribe(TopicFilter, SubOpts, Channel) of
|
|
|
ok ->
|
|
ok ->
|
|
@@ -491,7 +491,7 @@ process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) ->
|
|
|
process_unsubscribe(More, [RC|Acc], NChannel).
|
|
process_unsubscribe(More, [RC|Acc], NChannel).
|
|
|
|
|
|
|
|
do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
|
- #channel{client = ClientInfo = #{mountpoint := MountPoint},
|
|
|
|
|
|
|
+ #channel{client_info = ClientInfo = #{mountpoint := MountPoint},
|
|
|
session = Session}) ->
|
|
session = Session}) ->
|
|
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
|
|
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of
|
|
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of
|
|
@@ -506,7 +506,7 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
|
|
|
|
|
|
|
%%TODO: RunFold or Pipeline
|
|
%%TODO: RunFold or Pipeline
|
|
|
handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
|
|
handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
|
|
|
- Channel = #channel{conninfo = ConnInfo, client = ClientInfo}) ->
|
|
|
|
|
|
|
+ Channel = #channel{conninfo = ConnInfo, client_info = ClientInfo}) ->
|
|
|
AckProps = run_fold([fun enrich_caps/2,
|
|
AckProps = run_fold([fun enrich_caps/2,
|
|
|
fun enrich_server_keepalive/2,
|
|
fun enrich_server_keepalive/2,
|
|
|
fun enrich_assigned_clientid/2
|
|
fun enrich_assigned_clientid/2
|
|
@@ -531,7 +531,7 @@ handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
|
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
|
|
handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo,
|
|
|
- client = ClientInfo}) ->
|
|
|
|
|
|
|
+ client_info = ClientInfo}) ->
|
|
|
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
|
|
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
|
|
|
ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of
|
|
ReasonCode1 = case ProtoVer = maps:get(proto_ver, ConnInfo) of
|
|
|
?MQTT_PROTO_V5 -> ReasonCode;
|
|
?MQTT_PROTO_V5 -> ReasonCode;
|
|
@@ -572,11 +572,11 @@ handle_out({publish, Publishes}, Channel) when is_list(Publishes) ->
|
|
|
%% Ignore loop deliver
|
|
%% Ignore loop deliver
|
|
|
handle_out({publish, _PacketId, #message{from = ClientId,
|
|
handle_out({publish, _PacketId, #message{from = ClientId,
|
|
|
flags = #{nl := true}}},
|
|
flags = #{nl := true}}},
|
|
|
- Channel = #channel{client = #{client_id := ClientId}}) ->
|
|
|
|
|
|
|
+ Channel = #channel{client_info = #{client_id := ClientId}}) ->
|
|
|
{ok, Channel};
|
|
{ok, Channel};
|
|
|
|
|
|
|
|
handle_out({publish, PacketId, Msg}, Channel =
|
|
handle_out({publish, PacketId, Msg}, Channel =
|
|
|
- #channel{client = ClientInfo = #{mountpoint := MountPoint}}) ->
|
|
|
|
|
|
|
+ #channel{client_info = ClientInfo = #{mountpoint := MountPoint}}) ->
|
|
|
Msg1 = emqx_message:update_expiry(Msg),
|
|
Msg1 = emqx_message:update_expiry(Msg),
|
|
|
Msg2 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg1),
|
|
Msg2 = emqx_hooks:run_fold('message.delivered', [ClientInfo], Msg1),
|
|
|
Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
|
|
Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2),
|
|
@@ -657,7 +657,7 @@ handle_call(Req, Channel) ->
|
|
|
|
|
|
|
|
-spec(handle_cast(Msg :: term(), channel())
|
|
-spec(handle_cast(Msg :: term(), channel())
|
|
|
-> ok | {ok, channel()} | {stop, Reason :: term(), channel()}).
|
|
-> ok | {ok, channel()} | {stop, Reason :: term(), channel()}).
|
|
|
-handle_cast({register, Attrs, Stats}, #channel{client = #{client_id := ClientId}}) ->
|
|
|
|
|
|
|
+handle_cast({register, Attrs, Stats}, #channel{client_info = #{client_id := ClientId}}) ->
|
|
|
ok = emqx_cm:register_channel(ClientId),
|
|
ok = emqx_cm:register_channel(ClientId),
|
|
|
emqx_cm:set_chan_attrs(ClientId, Attrs),
|
|
emqx_cm:set_chan_attrs(ClientId, Attrs),
|
|
|
emqx_cm:set_chan_stats(ClientId, Stats);
|
|
emqx_cm:set_chan_stats(ClientId, Stats);
|
|
@@ -672,14 +672,14 @@ handle_cast(Msg, Channel) ->
|
|
|
|
|
|
|
|
-spec(handle_info(Info :: term(), channel())
|
|
-spec(handle_info(Info :: term(), channel())
|
|
|
-> {ok, channel()} | {stop, Reason :: term(), channel()}).
|
|
-> {ok, channel()} | {stop, Reason :: term(), channel()}).
|
|
|
-handle_info({subscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+handle_info({subscribe, TopicFilters}, Channel = #channel{client_info = ClientInfo}) ->
|
|
|
TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
|
TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
|
|
[ClientInfo, #{'Internal' => true}],
|
|
[ClientInfo, #{'Internal' => true}],
|
|
|
parse_topic_filters(TopicFilters)),
|
|
parse_topic_filters(TopicFilters)),
|
|
|
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
|
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
|
|
{ok, NChannel};
|
|
{ok, NChannel};
|
|
|
|
|
|
|
|
-handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+handle_info({unsubscribe, TopicFilters}, Channel = #channel{client_info = ClientInfo}) ->
|
|
|
TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
|
TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
|
|
[ClientInfo, #{'Internal' => true}],
|
|
[ClientInfo, #{'Internal' => true}],
|
|
|
parse_topic_filters(TopicFilters)),
|
|
parse_topic_filters(TopicFilters)),
|
|
@@ -693,7 +693,7 @@ handle_info(disconnected, Channel = #channel{connected = false}) ->
|
|
|
{ok, Channel};
|
|
{ok, Channel};
|
|
|
|
|
|
|
|
handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval},
|
|
handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval},
|
|
|
- client = ClientInfo = #{zone := Zone},
|
|
|
|
|
|
|
+ client_info = ClientInfo = #{zone := Zone},
|
|
|
will_msg = WillMsg}) ->
|
|
will_msg = WillMsg}) ->
|
|
|
emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
|
|
emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo),
|
|
|
Channel1 = ensure_disconnected(Channel),
|
|
Channel1 = ensure_disconnected(Channel),
|
|
@@ -726,7 +726,7 @@ handle_info(Info, Channel) ->
|
|
|
| {ok, Result :: term(), channel()}
|
|
| {ok, Result :: term(), channel()}
|
|
|
| {stop, Reason :: term(), channel()}).
|
|
| {stop, Reason :: term(), channel()}).
|
|
|
handle_timeout(TRef, {emit_stats, Stats},
|
|
handle_timeout(TRef, {emit_stats, Stats},
|
|
|
- Channel = #channel{client = #{client_id := ClientId},
|
|
|
|
|
|
|
+ Channel = #channel{client_info = #{client_id := ClientId},
|
|
|
timers = #{stats_timer := TRef}}) ->
|
|
timers = #{stats_timer := TRef}}) ->
|
|
|
ok = emqx_cm:set_chan_stats(ClientId, Stats),
|
|
ok = emqx_cm:set_chan_stats(ClientId, Stats),
|
|
|
{ok, clean_timer(stats_timer, Channel)};
|
|
{ok, clean_timer(stats_timer, Channel)};
|
|
@@ -810,7 +810,7 @@ reset_timer(Name, Time, Channel) ->
|
|
|
clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
|
|
Channel#channel{timers = maps:remove(Name, Timers)}.
|
|
Channel#channel{timers = maps:remove(Name, Timers)}.
|
|
|
|
|
|
|
|
-interval(stats_timer, #channel{client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+interval(stats_timer, #channel{client_info = #{zone := Zone}}) ->
|
|
|
emqx_zone:get_env(Zone, idle_timeout, 30000);
|
|
emqx_zone:get_env(Zone, idle_timeout, 30000);
|
|
|
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
|
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
|
|
emqx_keepalive:info(interval, KeepAlive);
|
|
emqx_keepalive:info(interval, KeepAlive);
|
|
@@ -834,12 +834,12 @@ will_delay_interval(WillMsg) ->
|
|
|
|
|
|
|
|
terminate(_, #channel{connected = undefined}) ->
|
|
terminate(_, #channel{connected = undefined}) ->
|
|
|
ok;
|
|
ok;
|
|
|
-terminate(normal, #channel{conninfo = ConnInfo, client = ClientInfo}) ->
|
|
|
|
|
|
|
+terminate(normal, #channel{conninfo = ConnInfo, client_info = ClientInfo}) ->
|
|
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
|
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
|
|
|
-terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, client = ClientInfo})
|
|
|
|
|
|
|
+terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, client_info = ClientInfo})
|
|
|
when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered ->
|
|
when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered ->
|
|
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]);
|
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]);
|
|
|
-terminate(Reason, #channel{conninfo = ConnInfo, client = ClientInfo, will_msg = WillMsg}) ->
|
|
|
|
|
|
|
+terminate(Reason, #channel{conninfo = ConnInfo, client_info = ClientInfo, will_msg = WillMsg}) ->
|
|
|
publish_will_msg(WillMsg),
|
|
publish_will_msg(WillMsg),
|
|
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
|
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]).
|
|
|
|
|
|
|
@@ -866,7 +866,7 @@ enrich_conninfo(#mqtt_packet_connect{
|
|
|
properties = ConnProps,
|
|
properties = ConnProps,
|
|
|
client_id = ClientId,
|
|
client_id = ClientId,
|
|
|
username = Username}, Channel) ->
|
|
username = Username}, Channel) ->
|
|
|
- #channel{conninfo = ConnInfo, client = #{zone := Zone}} = Channel,
|
|
|
|
|
|
|
+ #channel{conninfo = ConnInfo, client_info = #{zone := Zone}} = Channel,
|
|
|
MaxInflight = emqx_mqtt_props:get('Receive-Maximum',
|
|
MaxInflight = emqx_mqtt_props:get('Receive-Maximum',
|
|
|
ConnProps, emqx_zone:max_inflight(Zone)),
|
|
ConnProps, emqx_zone:max_inflight(Zone)),
|
|
|
Interval = if ProtoVer == ?MQTT_PROTO_V5 ->
|
|
Interval = if ProtoVer == ?MQTT_PROTO_V5 ->
|
|
@@ -889,18 +889,18 @@ enrich_conninfo(#mqtt_packet_connect{
|
|
|
{ok, Channel#channel{conninfo = NConnInfo}}.
|
|
{ok, Channel#channel{conninfo = NConnInfo}}.
|
|
|
|
|
|
|
|
%% @doc Check connect packet.
|
|
%% @doc Check connect packet.
|
|
|
-check_connect(ConnPkt, #channel{client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+check_connect(ConnPkt, #channel{client_info = #{zone := Zone}}) ->
|
|
|
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
|
|
emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)).
|
|
|
|
|
|
|
|
%% @doc Enrich client
|
|
%% @doc Enrich client
|
|
|
-enrich_client(ConnPkt, Channel = #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+enrich_client(ConnPkt, Channel = #channel{client_info = ClientInfo}) ->
|
|
|
{ok, NConnPkt, NClientInfo} =
|
|
{ok, NConnPkt, NClientInfo} =
|
|
|
pipeline([fun set_username/2,
|
|
pipeline([fun set_username/2,
|
|
|
fun set_bridge_mode/2,
|
|
fun set_bridge_mode/2,
|
|
|
fun maybe_username_as_clientid/2,
|
|
fun maybe_username_as_clientid/2,
|
|
|
fun maybe_assign_clientid/2,
|
|
fun maybe_assign_clientid/2,
|
|
|
fun fix_mountpoint/2], ConnPkt, ClientInfo),
|
|
fun fix_mountpoint/2], ConnPkt, ClientInfo),
|
|
|
- {ok, NConnPkt, Channel#channel{client = NClientInfo}}.
|
|
|
|
|
|
|
+ {ok, NConnPkt, Channel#channel{client_info = NClientInfo}}.
|
|
|
|
|
|
|
|
set_username(#mqtt_packet_connect{username = Username},
|
|
set_username(#mqtt_packet_connect{username = Username},
|
|
|
ClientInfo = #{username := undefined}) ->
|
|
ClientInfo = #{username := undefined}) ->
|
|
@@ -931,20 +931,20 @@ fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := Mountpoint}) ->
|
|
|
{ok, ClientInfo#{mountpoint := emqx_mountpoint:replvar(Mountpoint, ClientInfo)}}.
|
|
{ok, ClientInfo#{mountpoint := emqx_mountpoint:replvar(Mountpoint, ClientInfo)}}.
|
|
|
|
|
|
|
|
%% @doc Set logger metadata.
|
|
%% @doc Set logger metadata.
|
|
|
-set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) ->
|
|
|
|
|
|
|
+set_logger_meta(_ConnPkt, #channel{client_info = #{client_id := ClientId}}) ->
|
|
|
emqx_logger:set_metadata_client_id(ClientId).
|
|
emqx_logger:set_metadata_client_id(ClientId).
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
%% Check banned/flapping
|
|
%% Check banned/flapping
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
-check_banned(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+check_banned(_ConnPkt, #channel{client_info = ClientInfo = #{zone := Zone}}) ->
|
|
|
case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of
|
|
case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of
|
|
|
true -> {error, ?RC_BANNED};
|
|
true -> {error, ?RC_BANNED};
|
|
|
false -> ok
|
|
false -> ok
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-check_flapping(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+check_flapping(_ConnPkt, #channel{client_info = ClientInfo = #{zone := Zone}}) ->
|
|
|
case emqx_zone:enable_flapping_detect(Zone)
|
|
case emqx_zone:enable_flapping_detect(Zone)
|
|
|
andalso emqx_flapping:check(ClientInfo) of
|
|
andalso emqx_flapping:check(ClientInfo) of
|
|
|
true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
|
|
true -> {error, ?RC_CONNECTION_RATE_EXCEEDED};
|
|
@@ -958,10 +958,10 @@ check_flapping(_ConnPkt, #channel{client = ClientInfo = #{zone := Zone}}) ->
|
|
|
auth_connect(#mqtt_packet_connect{client_id = ClientId,
|
|
auth_connect(#mqtt_packet_connect{client_id = ClientId,
|
|
|
username = Username,
|
|
username = Username,
|
|
|
password = Password},
|
|
password = Password},
|
|
|
- Channel = #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+ Channel = #channel{client_info = ClientInfo}) ->
|
|
|
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
|
|
case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
|
|
|
{ok, AuthResult} ->
|
|
{ok, AuthResult} ->
|
|
|
- {ok, Channel#channel{client = maps:merge(ClientInfo, AuthResult)}};
|
|
|
|
|
|
|
+ {ok, Channel#channel{client_info = maps:merge(ClientInfo, AuthResult)}};
|
|
|
{error, Reason} ->
|
|
{error, Reason} ->
|
|
|
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
|
|
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
|
|
|
[ClientId, Username, Reason]),
|
|
[ClientId, Username, Reason]),
|
|
@@ -1004,7 +1004,7 @@ save_alias(AliasId, Topic, Aliases) -> maps:put(AliasId, Topic, Aliases).
|
|
|
|
|
|
|
|
%% Check Pub ACL
|
|
%% Check Pub ACL
|
|
|
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
|
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}},
|
|
|
- #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+ #channel{client_info = ClientInfo}) ->
|
|
|
case is_acl_enabled(ClientInfo) andalso
|
|
case is_acl_enabled(ClientInfo) andalso
|
|
|
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
|
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
|
|
false -> ok;
|
|
false -> ok;
|
|
@@ -1033,7 +1033,7 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
|
|
|
retain = Retain
|
|
retain = Retain
|
|
|
}
|
|
}
|
|
|
},
|
|
},
|
|
|
- #channel{client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+ #channel{client_info = #{zone := Zone}}) ->
|
|
|
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
|
|
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
|
|
|
|
|
|
|
|
%% Check Sub
|
|
%% Check Sub
|
|
@@ -1044,7 +1044,7 @@ check_subscribe(TopicFilter, SubOpts, Channel) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
%% Check Sub ACL
|
|
%% Check Sub ACL
|
|
|
-check_sub_acl(TopicFilter, #channel{client = ClientInfo}) ->
|
|
|
|
|
|
|
+check_sub_acl(TopicFilter, #channel{client_info = ClientInfo}) ->
|
|
|
case is_acl_enabled(ClientInfo) andalso
|
|
case is_acl_enabled(ClientInfo) andalso
|
|
|
emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of
|
|
emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of
|
|
|
false -> allow;
|
|
false -> allow;
|
|
@@ -1052,7 +1052,7 @@ check_sub_acl(TopicFilter, #channel{client = ClientInfo}) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
%% Check Sub Caps
|
|
%% Check Sub Caps
|
|
|
-check_sub_caps(TopicFilter, SubOpts, #channel{client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+check_sub_caps(TopicFilter, SubOpts, #channel{client_info = #{zone := Zone}}) ->
|
|
|
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
|
|
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
|
|
|
|
|
|
|
|
enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
|
enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) ->
|
|
@@ -1063,12 +1063,12 @@ enrich_subid(_Properties, TopicFilters) ->
|
|
|
enrich_subopts(SubOpts, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
|
|
enrich_subopts(SubOpts, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
|
|
|
SubOpts;
|
|
SubOpts;
|
|
|
|
|
|
|
|
-enrich_subopts(SubOpts, #channel{client = #{zone := Zone, is_bridge := IsBridge}}) ->
|
|
|
|
|
|
|
+enrich_subopts(SubOpts, #channel{client_info = #{zone := Zone, is_bridge := IsBridge}}) ->
|
|
|
NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
|
|
NL = flag(emqx_zone:ignore_loop_deliver(Zone)),
|
|
|
SubOpts#{rap => flag(IsBridge), nl => NL}.
|
|
SubOpts#{rap => flag(IsBridge), nl => NL}.
|
|
|
|
|
|
|
|
enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5},
|
|
enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5},
|
|
|
- client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+ client_info = #{zone := Zone}}) ->
|
|
|
#{max_packet_size := MaxPktSize,
|
|
#{max_packet_size := MaxPktSize,
|
|
|
max_qos_allowed := MaxQoS,
|
|
max_qos_allowed := MaxQoS,
|
|
|
retain_available := Retain,
|
|
retain_available := Retain,
|
|
@@ -1087,14 +1087,14 @@ enrich_caps(AckProps, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5},
|
|
|
enrich_caps(AckProps, _Channel) ->
|
|
enrich_caps(AckProps, _Channel) ->
|
|
|
AckProps.
|
|
AckProps.
|
|
|
|
|
|
|
|
-enrich_server_keepalive(AckProps, #channel{client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+enrich_server_keepalive(AckProps, #channel{client_info = #{zone := Zone}}) ->
|
|
|
case emqx_zone:server_keepalive(Zone) of
|
|
case emqx_zone:server_keepalive(Zone) of
|
|
|
undefined -> AckProps;
|
|
undefined -> AckProps;
|
|
|
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
|
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
|
enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
|
|
- client = #{client_id := ClientId}
|
|
|
|
|
|
|
+ client_info = #{client_id := ClientId}
|
|
|
}) ->
|
|
}) ->
|
|
|
case maps:get(client_id, ConnInfo) of
|
|
case maps:get(client_id, ConnInfo) of
|
|
|
<<>> -> %% Original ClientId is null.
|
|
<<>> -> %% Original ClientId is null.
|
|
@@ -1117,7 +1117,7 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
|
|
ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel).
|
|
|
|
|
|
|
|
ensure_keepalive_timer(0, Channel) -> Channel;
|
|
ensure_keepalive_timer(0, Channel) -> Channel;
|
|
|
-ensure_keepalive_timer(Interval, Channel = #channel{client = #{zone := Zone}}) ->
|
|
|
|
|
|
|
+ensure_keepalive_timer(Interval, Channel = #channel{client_info = #{zone := Zone}}) ->
|
|
|
Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
|
|
Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
|
|
|
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
|
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)),
|
|
|
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
|
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|