|
|
@@ -64,7 +64,8 @@
|
|
|
send_stats,
|
|
|
connected,
|
|
|
connected_at,
|
|
|
- ignore_loop
|
|
|
+ ignore_loop,
|
|
|
+ topic_alias_maximum
|
|
|
}).
|
|
|
|
|
|
-type(state() :: #pstate{}).
|
|
|
@@ -84,28 +85,29 @@
|
|
|
-spec(init(map(), list()) -> state()).
|
|
|
init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
|
|
|
Zone = proplists:get_value(zone, Options),
|
|
|
- #pstate{zone = Zone,
|
|
|
- sendfun = SendFun,
|
|
|
- peername = Peername,
|
|
|
- peercert = Peercert,
|
|
|
- proto_ver = ?MQTT_PROTO_V4,
|
|
|
- proto_name = <<"MQTT">>,
|
|
|
- client_id = <<>>,
|
|
|
- is_assigned = false,
|
|
|
- conn_pid = self(),
|
|
|
- username = init_username(Peercert, Options),
|
|
|
- is_super = false,
|
|
|
- clean_start = false,
|
|
|
- topic_aliases = #{},
|
|
|
- packet_size = emqx_zone:get_env(Zone, max_packet_size),
|
|
|
- mountpoint = emqx_zone:get_env(Zone, mountpoint),
|
|
|
- is_bridge = false,
|
|
|
- enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
|
|
|
- enable_acl = emqx_zone:get_env(Zone, enable_acl),
|
|
|
- recv_stats = #{msg => 0, pkt => 0},
|
|
|
- send_stats = #{msg => 0, pkt => 0},
|
|
|
- connected = false,
|
|
|
- ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}.
|
|
|
+ #pstate{zone = Zone,
|
|
|
+ sendfun = SendFun,
|
|
|
+ peername = Peername,
|
|
|
+ peercert = Peercert,
|
|
|
+ proto_ver = ?MQTT_PROTO_V4,
|
|
|
+ proto_name = <<"MQTT">>,
|
|
|
+ client_id = <<>>,
|
|
|
+ is_assigned = false,
|
|
|
+ conn_pid = self(),
|
|
|
+ username = init_username(Peercert, Options),
|
|
|
+ is_super = false,
|
|
|
+ clean_start = false,
|
|
|
+ topic_aliases = #{},
|
|
|
+ packet_size = emqx_zone:get_env(Zone, max_packet_size),
|
|
|
+ mountpoint = emqx_zone:get_env(Zone, mountpoint),
|
|
|
+ is_bridge = false,
|
|
|
+ enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
|
|
|
+ enable_acl = emqx_zone:get_env(Zone, enable_acl),
|
|
|
+ recv_stats = #{msg => 0, pkt => 0},
|
|
|
+ send_stats = #{msg => 0, pkt => 0},
|
|
|
+ connected = false,
|
|
|
+ ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false),
|
|
|
+ topic_alias_maximum = #{to_client => 0, from_client => 0}}.
|
|
|
|
|
|
init_username(Peercert, Options) ->
|
|
|
case proplists:get_value(peer_cert_as_username, Options) of
|
|
|
@@ -212,12 +214,16 @@ received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
|
|
|
{error, proto_unexpected_connect, PState};
|
|
|
|
|
|
received(Packet = ?PACKET(Type), PState) ->
|
|
|
- PState1 = set_protover(Packet, PState),
|
|
|
+ PState1 = set_protover(Packet, PState),
|
|
|
trace(recv, Packet),
|
|
|
try emqx_packet:validate(Packet) of
|
|
|
true ->
|
|
|
- {Packet1, PState2} = preprocess_properties(Packet, PState1),
|
|
|
- process_packet(Packet1, inc_stats(recv, Type, PState2))
|
|
|
+ case preprocess_properties(Packet, PState1) of
|
|
|
+ {error, ReasonCode} ->
|
|
|
+ {error, ReasonCode, PState1};
|
|
|
+ {Packet1, PState2} ->
|
|
|
+ process_packet(Packet1, inc_stats(recv, Type, PState2))
|
|
|
+ end
|
|
|
catch
|
|
|
error : protocol_error ->
|
|
|
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState1),
|
|
|
@@ -242,6 +248,13 @@ received(Packet = ?PACKET(Type), PState) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Preprocess MQTT Properties
|
|
|
%%------------------------------------------------------------------------------
|
|
|
+preprocess_properties(Packet = #mqtt_packet{
|
|
|
+ variable = #mqtt_packet_connect{
|
|
|
+ properties = #{'Topic-Alias-Maximum' := ToClient}
|
|
|
+ }
|
|
|
+ },
|
|
|
+ PState = #pstate{topic_alias_maximum = TopicAliasMaximum}) ->
|
|
|
+ {Packet, PState#pstate{topic_alias_maximum = TopicAliasMaximum#{to_client => ToClient}}};
|
|
|
|
|
|
%% Subscription Identifier
|
|
|
preprocess_properties(Packet = #mqtt_packet{
|
|
|
@@ -255,22 +268,46 @@ preprocess_properties(Packet = #mqtt_packet{
|
|
|
{Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState};
|
|
|
|
|
|
%% Topic Alias Mapping
|
|
|
+preprocess_properties(#mqtt_packet{
|
|
|
+ variable = #mqtt_packet_publish{
|
|
|
+ properties = #{'Topic-Alias' := 0}}
|
|
|
+ },
|
|
|
+ PState) ->
|
|
|
+ deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
|
|
|
+ {error, ?RC_TOPIC_ALIAS_INVALID};
|
|
|
+
|
|
|
preprocess_properties(Packet = #mqtt_packet{
|
|
|
variable = Publish = #mqtt_packet_publish{
|
|
|
topic_name = <<>>,
|
|
|
properties = #{'Topic-Alias' := AliasId}}
|
|
|
},
|
|
|
- PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
|
|
|
- {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{
|
|
|
- topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState};
|
|
|
+ PState = #pstate{proto_ver = ?MQTT_PROTO_V5,
|
|
|
+ topic_aliases = Aliases,
|
|
|
+ topic_alias_maximum = #{from_client := TopicAliasMaximum}}) ->
|
|
|
+ case AliasId =< TopicAliasMaximum of
|
|
|
+ true ->
|
|
|
+ {Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{
|
|
|
+ topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState};
|
|
|
+ false ->
|
|
|
+ deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
|
|
|
+ {error, ?RC_TOPIC_ALIAS_INVALID}
|
|
|
+ end;
|
|
|
|
|
|
preprocess_properties(Packet = #mqtt_packet{
|
|
|
- variable = #mqtt_packet_publish{
|
|
|
- topic_name = Topic,
|
|
|
- properties = #{'Topic-Alias' := AliasId}}
|
|
|
- },
|
|
|
- PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
|
|
|
- {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}};
|
|
|
+ variable = #mqtt_packet_publish{
|
|
|
+ topic_name = Topic,
|
|
|
+ properties = #{'Topic-Alias' := AliasId}}
|
|
|
+ },
|
|
|
+ PState = #pstate{proto_ver = ?MQTT_PROTO_V5,
|
|
|
+ topic_aliases = Aliases,
|
|
|
+ topic_alias_maximum = #{from_client := TopicAliasMaximum}}) ->
|
|
|
+ case AliasId =< TopicAliasMaximum of
|
|
|
+ true ->
|
|
|
+ {Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}};
|
|
|
+ false ->
|
|
|
+ deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState),
|
|
|
+ {error, ?RC_TOPIC_ALIAS_INVALID}
|
|
|
+ end;
|
|
|
|
|
|
preprocess_properties(Packet, PState) ->
|
|
|
{Packet, PState}.
|
|
|
@@ -278,7 +315,6 @@ preprocess_properties(Packet, PState) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Process MQTT Packet
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-
|
|
|
process_packet(?CONNECT_PACKET(
|
|
|
#mqtt_packet_connect{proto_name = ProtoName,
|
|
|
proto_ver = ProtoVer,
|
|
|
@@ -308,6 +344,7 @@ process_packet(?CONNECT_PACKET(
|
|
|
will_msg = WillMsg,
|
|
|
is_bridge = IsBridge,
|
|
|
connected_at = os:timestamp()}),
|
|
|
+
|
|
|
connack(
|
|
|
case check_connect(Connect, PState1) of
|
|
|
{ok, PState2} ->
|
|
|
@@ -342,9 +379,6 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt
|
|
|
case check_publish(Packet, PState) of
|
|
|
{ok, PState1} ->
|
|
|
do_publish(Packet, PState1);
|
|
|
- {error, ?RC_TOPIC_ALIAS_INVALID} ->
|
|
|
- ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID]),
|
|
|
- {error, ?RC_TOPIC_ALIAS_INVALID, PState};
|
|
|
{error, ReasonCode} ->
|
|
|
?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
|
|
|
[Topic, emqx_reason_codes:text(ReasonCode)]),
|
|
|
@@ -523,7 +557,8 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
|
|
|
proto_ver = ?MQTT_PROTO_V5,
|
|
|
client_id = ClientId,
|
|
|
conn_props = ConnProps,
|
|
|
- is_assigned = IsAssigned}) ->
|
|
|
+ is_assigned = IsAssigned,
|
|
|
+ topic_alias_maximum = TopicAliasMaximum}) ->
|
|
|
ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of
|
|
|
{ok, 1} ->
|
|
|
iolist_to_binary(emqx_config:get_env(response_topic_prefix));
|
|
|
@@ -561,17 +596,20 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
|
|
|
undefined -> Props2;
|
|
|
Keepalive -> Props2#{'Server-Keep-Alive' => Keepalive}
|
|
|
end,
|
|
|
- send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState);
|
|
|
+
|
|
|
+ PState1 = PState#pstate{topic_alias_maximum = TopicAliasMaximum#{from_client => MaxAlias}},
|
|
|
+
|
|
|
+ send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props3), PState1);
|
|
|
|
|
|
deliver({connack, ReasonCode, SP}, PState) ->
|
|
|
send(?CONNACK_PACKET(ReasonCode, SP), PState);
|
|
|
|
|
|
-deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
|
|
|
+deliver({publish, PacketId, Msg = #message{headers = Headers}}, PState = #pstate{mountpoint = MountPoint}) ->
|
|
|
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
|
|
|
Msg1 = emqx_message:update_expiry(Msg),
|
|
|
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
|
|
|
- send(emqx_packet:from_message(PacketId, Msg2), PState);
|
|
|
-
|
|
|
+ send(emqx_packet:from_message(PacketId, Msg2#message{headers = maps:remove('Topic-Alias', Headers)}), PState);
|
|
|
+
|
|
|
deliver({puback, PacketId, ReasonCode}, PState) ->
|
|
|
send(?PUBACK_PACKET(PacketId, ReasonCode), PState);
|
|
|
|
|
|
@@ -758,18 +796,11 @@ check_publish(Packet, PState) ->
|
|
|
run_check_steps([fun check_pub_caps/2,
|
|
|
fun check_pub_acl/2], Packet, PState).
|
|
|
|
|
|
-check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain},
|
|
|
- variable = #mqtt_packet_publish{
|
|
|
- properties = #{'Topic-Alias' := TopicAlias}
|
|
|
- }},
|
|
|
- #pstate{zone = Zone}) ->
|
|
|
- emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic_alias => TopicAlias});
|
|
|
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain},
|
|
|
variable = #mqtt_packet_publish{ properties = _Properties}},
|
|
|
#pstate{zone = Zone}) ->
|
|
|
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
|
|
|
|
|
|
-
|
|
|
check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
|
|
|
when IsSuper orelse (not EnableAcl) ->
|
|
|
ok;
|