|
|
@@ -71,6 +71,8 @@
|
|
|
topic_aliases :: emqx_types:topic_aliases(),
|
|
|
%% MQTT Topic Alias Maximum
|
|
|
alias_maximum :: maybe(map()),
|
|
|
+ %% Authentication Data Cache
|
|
|
+ auth_cache :: maybe(map()),
|
|
|
%% Timers
|
|
|
timers :: #{atom() => disabled | maybe(reference())},
|
|
|
%% Conn State
|
|
|
@@ -185,6 +187,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
|
|
|
topic_aliases = #{inbound => #{},
|
|
|
outbound => #{}
|
|
|
},
|
|
|
+ auth_cache = #{},
|
|
|
timers = #{},
|
|
|
conn_state = idle,
|
|
|
takeover = false,
|
|
|
@@ -216,10 +219,41 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
|
|
fun check_banned/2,
|
|
|
fun auth_connect/2
|
|
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
|
|
- {ok, NConnPkt, NChannel} ->
|
|
|
- process_connect(NConnPkt, NChannel);
|
|
|
+ {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
|
|
+ NChannel1 = NChannel#channel{
|
|
|
+ will_msg = emqx_packet:will_msg(NConnPkt),
|
|
|
+ alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
|
|
+ },
|
|
|
+ case enhanced_auth(?CONNECT_PACKET(NConnPkt), NChannel1) of
|
|
|
+ {ok, Properties, NChannel2} ->
|
|
|
+ process_connect(Properties, ensure_connected(NChannel2));
|
|
|
+ {continue, Properties, NChannel2} ->
|
|
|
+ handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
|
|
|
+ {error, ReasonCode, NChannel2} ->
|
|
|
+ handle_out(connack, ReasonCode, NChannel2)
|
|
|
+ end;
|
|
|
{error, ReasonCode, NChannel} ->
|
|
|
- handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
|
|
|
+ handle_out(connack, ReasonCode, NChannel)
|
|
|
+ end;
|
|
|
+
|
|
|
+handle_in(Packet = ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, _Properties), Channel) ->
|
|
|
+ case enhanced_auth(Packet, Channel) of
|
|
|
+ {ok, NProperties, NChannel} ->
|
|
|
+ process_connect(NProperties, ensure_connected(NChannel));
|
|
|
+ {continue, NProperties, NChannel} ->
|
|
|
+ handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
|
|
|
+ {error, NReasonCode, NChannel} ->
|
|
|
+ handle_out(connack, NReasonCode, NChannel)
|
|
|
+ end;
|
|
|
+
|
|
|
+handle_in(Packet = ?AUTH_PACKET(?RC_RE_AUTHENTICATE, _Properties), Channel) ->
|
|
|
+ case enhanced_auth(Packet, Channel) of
|
|
|
+ {ok, NProperties, NChannel} ->
|
|
|
+ handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel);
|
|
|
+ {continue, NProperties, NChannel} ->
|
|
|
+ handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
|
|
|
+ {error, NReasonCode, NChannel} ->
|
|
|
+ handle_out(disconnect, NReasonCode, NChannel)
|
|
|
end;
|
|
|
|
|
|
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
|
|
@@ -362,24 +396,23 @@ handle_in(Packet, Channel) ->
|
|
|
%% Process Connect
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
|
|
- Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
|
|
+process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanStart} = ConnInfo, clientinfo = ClientInfo}) ->
|
|
|
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
|
|
{ok, #{session := Session, present := false}} ->
|
|
|
NChannel = Channel#channel{session = Session},
|
|
|
- handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
|
|
|
+ handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel);
|
|
|
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
|
|
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
|
|
NChannel = Channel#channel{session = Session,
|
|
|
resuming = true,
|
|
|
pendings = Pendings1
|
|
|
},
|
|
|
- handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
|
|
|
+ handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel);
|
|
|
{error, client_id_unavailable} ->
|
|
|
- handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel);
|
|
|
+ handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
|
|
|
{error, Reason} ->
|
|
|
?LOG(error, "Failed to open session due to ~p", [Reason]),
|
|
|
- handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
|
|
|
+ handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -579,18 +612,17 @@ not_nacked({deliver, _Topic, Msg}) ->
|
|
|
| {ok, replies(), channel()}
|
|
|
| {shutdown, Reason :: term(), channel()}
|
|
|
| {shutdown, Reason :: term(), replies(), channel()}).
|
|
|
-handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
+handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
AckProps = run_fold([fun enrich_connack_caps/2,
|
|
|
fun enrich_server_keepalive/2,
|
|
|
fun enrich_assigned_clientid/2
|
|
|
- ], #{}, Channel),
|
|
|
+ ], Props, Channel),
|
|
|
NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
|
|
|
|
|
|
return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
|
|
|
- ensure_keepalive(NAckProps,
|
|
|
- ensure_connected(ConnPkt, Channel)));
|
|
|
+ ensure_keepalive(NAckProps, Channel));
|
|
|
|
|
|
-handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
+handle_out(connack, ReasonCode, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
Reason = emqx_reason_codes:name(ReasonCode),
|
|
|
AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
|
|
|
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
|
|
|
@@ -643,6 +675,9 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) ->
|
|
|
handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) ->
|
|
|
{ok, {close, ReasonName}, Channel};
|
|
|
|
|
|
+handle_out(auth, {ReasonCode, Properties}, Channel) ->
|
|
|
+ {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
|
|
|
+
|
|
|
handle_out(Type, Data, Channel) ->
|
|
|
?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
|
|
|
{ok, Channel}.
|
|
|
@@ -1069,6 +1104,55 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId,
|
|
|
is_anonymous(#{anonymous := true}) -> true;
|
|
|
is_anonymous(_AuthResult) -> false.
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Enhanced Authentication
|
|
|
+
|
|
|
+enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{
|
|
|
+ proto_ver = Protover,
|
|
|
+ properties = Properties
|
|
|
+ }), Channel) ->
|
|
|
+ case Protover of
|
|
|
+ ?MQTT_PROTO_V5 ->
|
|
|
+ AuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
|
|
|
+ AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
|
|
|
+ do_enhanced_auth(AuthMethod, AuthData, Channel);
|
|
|
+ _ ->
|
|
|
+ {ok, #{}, Channel}
|
|
|
+ end;
|
|
|
+
|
|
|
+enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
+ AuthMethod = maps:get('Authentication-Method', maps:get(conn_props, ConnInfo), undefined),
|
|
|
+ NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
|
|
|
+ AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
|
|
|
+ case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of
|
|
|
+ true ->
|
|
|
+ {error, emqx_reason_codes:connack_error(bad_authentication_method), Channel};
|
|
|
+ false ->
|
|
|
+ do_enhanced_auth(AuthMethod, AuthData, Channel)
|
|
|
+ end.
|
|
|
+
|
|
|
+do_enhanced_auth(undefined, undefined, Channel) ->
|
|
|
+ {ok, #{}, Channel};
|
|
|
+do_enhanced_auth(undefined, _AuthData, Channel) ->
|
|
|
+ {error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
|
|
+do_enhanced_auth(_AuthMethod, undefined, Channel) ->
|
|
|
+ {error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
|
|
+do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) ->
|
|
|
+ case do_auth_check(AuthMethod, AuthData, Cache) of
|
|
|
+ ok -> {ok, #{}, Channel#channel{auth_cache = #{}}};
|
|
|
+ {ok, NAuthData} ->
|
|
|
+ NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
|
|
+ {ok, NProperties, Channel#channel{auth_cache = #{}}};
|
|
|
+ {continue, NAuthData, NCache} ->
|
|
|
+ NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
|
|
+ {continue, NProperties, Channel#channel{auth_cache = NCache}};
|
|
|
+ {error, _Reason} ->
|
|
|
+ {error, emqx_reason_codes:connack_error(not_authorized), Channel}
|
|
|
+ end.
|
|
|
+
|
|
|
+do_auth_check(_AuthMethod, _AuthData, _AuthDataCache) ->
|
|
|
+ {error, not_authorized}.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process Topic Alias
|
|
|
|
|
|
@@ -1259,14 +1343,12 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Ensure connected
|
|
|
|
|
|
-ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
|
|
|
- clientinfo = ClientInfo}) ->
|
|
|
+ensure_connected(Channel = #channel{conninfo = ConnInfo,
|
|
|
+ clientinfo = ClientInfo}) ->
|
|
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
|
|
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
|
|
Channel#channel{conninfo = NConnInfo,
|
|
|
- conn_state = connected,
|
|
|
- will_msg = emqx_packet:will_msg(ConnPkt),
|
|
|
- alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
|
|
|
+ conn_state = connected
|
|
|
}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|