Browse Source

Merge pull request #3929 from z8674558/feat/coap-acl

feat(coap): use emqx_access_control:check_acl before pub/sub
Yudai Kiyofuji 5 years ago
parent
commit
8a50c70ca3
1 changed files with 39 additions and 15 deletions
  1. 39 15
      apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl

+ 39 - 15
apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl

@@ -50,6 +50,8 @@
 
 
 -record(state, {peername, clientid, username, password, sub_topics = [], connected_at}).
 -record(state, {peername, clientid, username, password, sub_topics = [], connected_at}).
 
 
+-type(state() :: #state{}).
+
 -define(ALIVE_INTERVAL, 20000).
 -define(ALIVE_INTERVAL, 20000).
 
 
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
@@ -74,8 +76,10 @@ client_pid(ClientId, Username, Password, Channel) ->
 start(ClientId, Username, Password, Channel) ->
 start(ClientId, Username, Password, Channel) ->
     % DO NOT use start_link, since multiple coap_reponsder may have relation with one mqtt adapter,
     % DO NOT use start_link, since multiple coap_reponsder may have relation with one mqtt adapter,
     % one coap_responder crashes should not make mqtt adapter crash too
     % one coap_responder crashes should not make mqtt adapter crash too
-    % And coap_responder is not a system process, it is dangerous to link mqtt adapter to coap_responder
-    gen_server:start({via, emqx_coap_registry, {ClientId, Username, Password}}, ?MODULE, {ClientId, Username, Password, Channel}, []).
+    % And coap_responder is not a system process
+    % it is dangerous to link mqtt adapter to coap_responder
+    gen_server:start({via, emqx_coap_registry, {ClientId, Username, Password}},
+                     ?MODULE, {ClientId, Username, Password, Channel}, []).
 
 
 stop(Pid) ->
 stop(Pid) ->
     gen_server:stop(Pid).
     gen_server:stop(Pid).
@@ -107,12 +111,12 @@ init({ClientId, Username, Password, Channel}) ->
     _ = run_hooks('client.connect', [conninfo(State0)], undefined),
     _ = run_hooks('client.connect', [conninfo(State0)], undefined),
     case emqx_access_control:authenticate(clientinfo(State0)) of
     case emqx_access_control:authenticate(clientinfo(State0)) of
         {ok, _AuthResult} ->
         {ok, _AuthResult} ->
+            ok = emqx_cm:discard_session(ClientId),
+
             _ = run_hooks('client.connack', [conninfo(State0), success], undefined),
             _ = run_hooks('client.connack', [conninfo(State0), success], undefined),
 
 
             State = State0#state{connected_at = erlang:system_time(millisecond)},
             State = State0#state{connected_at = erlang:system_time(millisecond)},
 
 
-            %% TODO: Evict same clientid on other node??
-
             run_hooks('client.connected', [clientinfo(State), conninfo(State)]),
             run_hooks('client.connected', [clientinfo(State), conninfo(State)]),
 
 
             erlang:send_after(?ALIVE_INTERVAL, self(), check_alive),
             erlang:send_after(?ALIVE_INTERVAL, self(), check_alive),
@@ -164,7 +168,8 @@ handle_cast(Msg, State) ->
     ?LOG(error, "broker_api unexpected cast ~p", [Msg]),
     ?LOG(error, "broker_api unexpected cast ~p", [Msg]),
     {noreply, State, hibernate}.
     {noreply, State, hibernate}.
 
 
-handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}}, State = #state{sub_topics = Subscribers}) ->
+handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}},
+            State = #state{sub_topics = Subscribers}) ->
     deliver([{Topic, Payload}], Subscribers),
     deliver([{Topic, Payload}], Subscribers),
     {noreply, State, hibernate};
     {noreply, State, hibernate};
 
 
@@ -181,6 +186,11 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
     ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
     ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
     {stop, {shutdown, conflict}, State};
     {stop, {shutdown, conflict}, State};
 
 
+handle_info(discard, State) ->
+    ?LOG(warning, "the connection is discarded. " ++
+                  "possibly there is another client with the same clientid", []),
+    {stop, {shutdown, discarded}, State};
+
 handle_info(kick, State) ->
 handle_info(kick, State) ->
     ?LOG(info, "Kicked", []),
     ?LOG(info, "Kicked", []),
     {stop, {shutdown, kick}, State};
     {stop, {shutdown, kick}, State};
@@ -206,8 +216,14 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 chann_subscribe(Topic, State = #state{clientid = ClientId}) ->
 chann_subscribe(Topic, State = #state{clientid = ClientId}) ->
     ?LOG(debug, "subscribe Topic=~p", [Topic]),
     ?LOG(debug, "subscribe Topic=~p", [Topic]),
-    emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS),
-    emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]).
+    case emqx_access_control:check_acl(clientinfo(State), subscribe, Topic) of
+        allow ->
+            emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS),
+            emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]);
+        deny  ->
+            ?LOG(warning, "subscribe to ~p by clientid ~p failed due to acl check.",
+                 [Topic, ClientId])
+    end.
 
 
 chann_unsubscribe(Topic, State) ->
 chann_unsubscribe(Topic, State) ->
     ?LOG(debug, "unsubscribe Topic=~p", [Topic]),
     ?LOG(debug, "unsubscribe Topic=~p", [Topic]),
@@ -215,11 +231,18 @@ chann_unsubscribe(Topic, State) ->
     emqx_broker:unsubscribe(Topic),
     emqx_broker:unsubscribe(Topic),
     emqx_hooks:run('session.unsubscribed', [clientinfo(State), Topic, Opts]).
     emqx_hooks:run('session.unsubscribed', [clientinfo(State), Topic, Opts]).
 
 
-chann_publish(Topic, Payload, #state{clientid = ClientId}) ->
+chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
     ?LOG(debug, "publish Topic=~p, Payload=~p", [Topic, Payload]),
     ?LOG(debug, "publish Topic=~p, Payload=~p", [Topic, Payload]),
-    emqx_broker:publish(
-        emqx_message:set_flag(retain, false,
-            emqx_message:make(ClientId, ?QOS_0, Topic, Payload))).
+    case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of
+        allow ->
+            emqx_broker:publish(
+                emqx_message:set_flag(retain, false,
+                                      emqx_message:make(ClientId, ?QOS_0, Topic, Payload)));
+        deny  ->
+            ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
+                 [Topic, ClientId])
+    end.
+
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Deliver
 %% Deliver
@@ -242,7 +265,8 @@ deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) ->
                     true  -> emqx_topic:match(TopicName, TopicFilter);
                     true  -> emqx_topic:match(TopicName, TopicFilter);
                     false -> TopicName =:= TopicFilter
                     false -> TopicName =:= TopicFilter
                 end,
                 end,
-    %?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p", [Matched, CoapPid, TopicName, Payload, T]),
+    %?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p",
+    %     [Matched, CoapPid, TopicName, Payload, T]),
     Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
     Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
     deliver_to_coap(TopicName, Payload, T).
     deliver_to_coap(TopicName, Payload, T).
 
 
@@ -267,7 +291,7 @@ info(State) ->
 sockinfo(#state{peername = Peername}) ->
 sockinfo(#state{peername = Peername}) ->
     #{socktype => udp,
     #{socktype => udp,
       peername => Peername,
       peername => Peername,
-      sockname => {{127,0,0,1}, 5683},    %% FIXME: Sock?
+      sockname => {{127, 0, 0, 1}, 5683},    %% FIXME: Sock?
       sockstate =>  running,
       sockstate =>  running,
       active_n => 1
       active_n => 1
      }.
      }.
@@ -285,7 +309,7 @@ conninfo(#state{peername = Peername,
                 clientid = ClientId,
                 clientid = ClientId,
                 connected_at = ConnectedAt}) ->
                 connected_at = ConnectedAt}) ->
     #{socktype => udp,
     #{socktype => udp,
-      sockname => {{127,0,0,1}, 5683},
+      sockname => {{127, 0, 0, 1}, 5683},
       peername => Peername,
       peername => Peername,
       peercert => nossl,        %% TODO: dtls
       peercert => nossl,        %% TODO: dtls
       conn_mod => ?MODULE,
       conn_mod => ?MODULE,
@@ -317,7 +341,7 @@ session_info(#state{sub_topics = SubTopics, connected_at = ConnectedAt}) ->
 
 
 %% The stats keys copied from emqx_connection:stats/1
 %% The stats keys copied from emqx_connection:stats/1
 stats(#state{sub_topics = SubTopics}) ->
 stats(#state{sub_topics = SubTopics}) ->
-    SockStats = [{recv_oct,0}, {recv_cnt,0}, {send_oct,0}, {send_cnt,0}, {send_pend,0}],
+    SockStats = [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0}, {send_pend, 0}],
     ConnStats = emqx_pd:get_counters(?CONN_STATS),
     ConnStats = emqx_pd:get_counters(?CONN_STATS),
     ChanStats = [{subscriptions_cnt, length(SubTopics)},
     ChanStats = [{subscriptions_cnt, length(SubTopics)},
                  {subscriptions_max, length(SubTopics)},
                  {subscriptions_max, length(SubTopics)},