|
@@ -34,7 +34,7 @@
|
|
|
%% Protocol State
|
|
%% Protocol State
|
|
|
-record(proto_state, {peername, sendfun, connected = false,
|
|
-record(proto_state, {peername, sendfun, connected = false,
|
|
|
client_id, client_pid, clean_sess,
|
|
client_id, client_pid, clean_sess,
|
|
|
- proto_ver, proto_name, username,
|
|
|
|
|
|
|
+ proto_ver, proto_name, username, is_superuser = false,
|
|
|
will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
|
|
will_msg, keepalive, max_clientid_len = ?MAX_CLIENTID_LEN,
|
|
|
session, ws_initial_headers, %% Headers from first HTTP request for websocket client
|
|
session, ws_initial_headers, %% Headers from first HTTP request for websocket client
|
|
|
connected_at}).
|
|
connected_at}).
|
|
@@ -159,8 +159,12 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|
|
{ReturnCode1, SessPresent, State3} =
|
|
{ReturnCode1, SessPresent, State3} =
|
|
|
case validate_connect(Var, State1) of
|
|
case validate_connect(Var, State1) of
|
|
|
?CONNACK_ACCEPT ->
|
|
?CONNACK_ACCEPT ->
|
|
|
- case emqttd_access_control:auth(client(State1), Password) of
|
|
|
|
|
|
|
+ Client = client(State1),
|
|
|
|
|
+ case emqttd_access_control:auth(Client, Password) of
|
|
|
ok ->
|
|
ok ->
|
|
|
|
|
+ %% Is Superuser?
|
|
|
|
|
+ IsSuperuser = emqttd_access_control:is_superuser(Client),
|
|
|
|
|
+
|
|
|
%% Generate clientId if null
|
|
%% Generate clientId if null
|
|
|
State2 = maybe_set_clientid(State1),
|
|
State2 = maybe_set_clientid(State1),
|
|
|
|
|
|
|
@@ -172,7 +176,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|
|
%% Start keepalive
|
|
%% Start keepalive
|
|
|
start_keepalive(KeepAlive),
|
|
start_keepalive(KeepAlive),
|
|
|
%% ACCEPT
|
|
%% ACCEPT
|
|
|
- {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session}};
|
|
|
|
|
|
|
+ {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
|
|
|
{error, Error} ->
|
|
{error, Error} ->
|
|
|
exit({shutdown, Error})
|
|
exit({shutdown, Error})
|
|
|
end;
|
|
end;
|
|
@@ -188,12 +192,10 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|
|
%% Send connack
|
|
%% Send connack
|
|
|
send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
|
|
send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3);
|
|
|
|
|
|
|
|
-process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
|
|
|
|
|
- case check_acl(publish, Topic, client(State)) of
|
|
|
|
|
- allow ->
|
|
|
|
|
- publish(Packet, State);
|
|
|
|
|
- deny ->
|
|
|
|
|
- ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
|
|
|
|
|
|
|
+process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State = #proto_state{is_superuser = IsSuper}) ->
|
|
|
|
|
+ case IsSuper orelse allow == check_acl(publish, Topic, client(State)) of
|
|
|
|
|
+ true -> publish(Packet, State);
|
|
|
|
|
+ false -> ?LOG(error, "Cannot publish to ~s for ACL Deny", [Topic], State)
|
|
|
end,
|
|
end,
|
|
|
{ok, State};
|
|
{ok, State};
|
|
|
|
|
|
|
@@ -216,11 +218,14 @@ process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessi
|
|
|
process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
|
|
send(?SUBACK_PACKET(PacketId, []), State);
|
|
send(?SUBACK_PACKET(PacketId, []), State);
|
|
|
|
|
|
|
|
-process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{
|
|
|
|
|
- client_id = ClientId, username = Username, session = Session}) ->
|
|
|
|
|
- Client = client(State),
|
|
|
|
|
- TopicTable = parse_topic_table(RawTopicTable),
|
|
|
|
|
- AllowDenies = [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable],
|
|
|
|
|
|
|
+%% TODO: refactor later...
|
|
|
|
|
+process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session = Session,
|
|
|
|
|
+ client_id = ClientId, username = Username, is_superuser = IsSuperuser}) ->
|
|
|
|
|
+ Client = client(State), TopicTable = parse_topic_table(RawTopicTable),
|
|
|
|
|
+ AllowDenies = if
|
|
|
|
|
+ IsSuperuser -> [];
|
|
|
|
|
+ true -> [check_acl(subscribe, Topic, Client) || {Topic, _Opts} <- TopicTable]
|
|
|
|
|
+ end,
|
|
|
case lists:member(deny, AllowDenies) of
|
|
case lists:member(deny, AllowDenies) of
|
|
|
true ->
|
|
true ->
|
|
|
?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
|
|
?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
|