|
|
@@ -159,12 +159,8 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|
|
{ReturnCode1, SessPresent, State3} =
|
|
|
case validate_connect(Var, State1) of
|
|
|
?CONNACK_ACCEPT ->
|
|
|
- Client = client(State1),
|
|
|
- case emqttd_access_control:auth(Client, Password) of
|
|
|
- ok ->
|
|
|
- %% Is Superuser?
|
|
|
- IsSuperuser = emqttd_access_control:is_superuser(Client),
|
|
|
-
|
|
|
+ case authenticate(client(State1), Password) of
|
|
|
+ {ok, IsSuperuser} ->
|
|
|
%% Generate clientId if null
|
|
|
State2 = maybe_set_clientid(State1),
|
|
|
|
|
|
@@ -190,7 +186,9 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|
|
%% Run hooks
|
|
|
emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)),
|
|
|
%% Send connack
|
|
|
- send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
|
|
|
+ send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3),
|
|
|
+ %% stop if authentication failure
|
|
|
+ stop_if_auth_failure(ReturnCode1, State3);
|
|
|
|
|
|
process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State = #proto_state{is_superuser = IsSuper}) ->
|
|
|
case IsSuper orelse allow == check_acl(publish, Topic, client(State)) of
|
|
|
@@ -302,6 +300,12 @@ trace(send, Packet, ProtoState) ->
|
|
|
redeliver({?PUBREL, PacketId}, State) ->
|
|
|
send(?PUBREL_PACKET(PacketId), State).
|
|
|
|
|
|
+stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH ->
|
|
|
+ {stop, {shutdown, auth_failure}, State};
|
|
|
+
|
|
|
+stop_if_auth_failure(_RC, State) ->
|
|
|
+ {ok, State}.
|
|
|
+
|
|
|
shutdown(_Error, #proto_state{client_id = undefined}) ->
|
|
|
ignore;
|
|
|
|
|
|
@@ -435,6 +439,13 @@ parse_topic_table(TopicTable) ->
|
|
|
parse_topics(Topics) ->
|
|
|
[emqttd_topic:parse(Topic) || Topic <- Topics].
|
|
|
|
|
|
+authenticate(Client, Password) ->
|
|
|
+ case emqttd_access_control:auth(Client, Password) of
|
|
|
+ ok -> {ok, false};
|
|
|
+ {ok, IsSuper} -> {ok, IsSuper};
|
|
|
+ {error, Error} -> {error, Error}
|
|
|
+ end.
|
|
|
+
|
|
|
%% PUBLISH ACL is cached in process dictionary.
|
|
|
check_acl(publish, Topic, Client) ->
|
|
|
IfCache = emqttd:conf(cache_acl, true),
|