Browse Source

Merge pull request #7440 from EMQ-YangM/add_connack_and_cac

feat: add rule events: 'client.connack', 'client.check_authz_complete'
Yang Miao 3 years ago
parent
commit
d72da3ecf6

+ 4 - 0
apps/emqx/src/emqx_access_control.erl

@@ -60,6 +60,10 @@ check_authorization_cache(ClientInfo, PubSub, Topic) ->
             emqx_authz_cache:put_authz_cache(PubSub, Topic, AuthzResult),
             AuthzResult;
         AuthzResult ->
+            emqx:run_hook(
+                'client.check_authz_complete',
+                [ClientInfo, PubSub, Topic, AuthzResult, cache]
+            ),
             inc_acl_metrics(cache_hit),
             AuthzResult
     end.

+ 9 - 3
apps/emqx_authz/src/emqx_authz.erl

@@ -283,14 +283,18 @@ authorize(#{username := Username,
             peerhost := IpAddress
            } = Client, PubSub, Topic, DefaultResult, Sources) ->
     case do_authorize(Client, PubSub, Topic, Sources) of
-        {matched, allow} ->
+        {{matched, allow}, AuthzSource}->
+            emqx:run_hook('client.check_authz_complete',
+                          [Client, PubSub, Topic, allow, AuthzSource]),
             ?SLOG(info, #{msg => "authorization_permission_allowed",
                           username => Username,
                           ipaddr => IpAddress,
                           topic => Topic}),
             emqx_metrics:inc(?METRIC_ALLOW),
             {stop, allow};
-        {matched, deny} ->
+        {{matched, deny}, AuthzSource}->
+            emqx:run_hook('client.check_authz_complete',
+                          [Client, PubSub, Topic, deny, AuthzSource]),
             ?SLOG(info, #{msg => "authorization_permission_denied",
                           username => Username,
                           ipaddr => IpAddress,
@@ -298,6 +302,8 @@ authorize(#{username := Username,
             emqx_metrics:inc(?METRIC_DENY),
             {stop, deny};
         nomatch ->
+            emqx:run_hook('client.check_authz_complete',
+                          [Client, PubSub, Topic, DefaultResult, default]),
             ?SLOG(info, #{msg => "authorization_failed_nomatch",
                           username => Username,
                           ipaddr => IpAddress,
@@ -316,7 +322,7 @@ do_authorize(Client, PubSub, Topic,
     Module = authz_module(Type),
     case Module:authorize(Client, PubSub, Topic, Connector) of
         nomatch -> do_authorize(Client, PubSub, Topic, Tail);
-        Matched -> Matched
+        Matched -> {Matched, Type}
     end.
 
 %%--------------------------------------------------------------------

+ 27 - 0
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -73,6 +73,8 @@ fields("rule_test") ->
                                    , ref("ctx_dropped")
                                    , ref("ctx_connected")
                                    , ref("ctx_disconnected")
+                                   , ref("ctx_connack")
+                                   , ref("ctx_check_authz_complete")
                                    , ref("ctx_bridge_mqtt")
                                    ]),
         #{desc => "The context of the event for testing",
@@ -208,6 +210,31 @@ fields("ctx_disconnected") ->
         desc => "The Time that this Client is Disconnected"})}
     ];
 
+fields("ctx_connack") ->
+    [ {"event_type", sc(client_connack, #{desc => "Event Type", required => true})}
+    , {"reason_code", sc(binary(), #{desc => "The reason code"})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"clean_start", sc(boolean(), #{desc => "Clean Start", default => true})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"peername", sc(binary(), #{desc => "The IP Address and Port of the Peer Client"})}
+    , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})}
+    , {"proto_name", sc(binary(), #{desc => "Protocol Name"})}
+    , {"proto_ver", sc(binary(), #{desc => "Protocol Version"})}
+    , {"keepalive", sc(integer(), #{desc => "KeepAlive"})}
+    , {"expiry_interval", sc(integer(), #{desc => "Expiry Interval"})}
+    , {"connected_at", sc(integer(), #{
+        desc => "The Time that this Client is Connected"})}
+    ];
+fields("ctx_check_authz_complete") ->
+    [ {"event_type", sc(client_check_authz_complete, #{desc => "Event Type", required => true})}
+    , {"clientid", sc(binary(), #{desc => "The Client ID"})}
+    , {"username", sc(binary(), #{desc => "The User Name"})}
+    , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})}
+    , {"topic", sc(binary(), #{desc => "Message Topic"})}
+    , {"action", sc(binary(), #{desc => "Publish or Subscribe"})}
+    , {"authz_source", sc(binary(), #{desc => "Cache, Plugs or Default"})}
+    , {"result", sc(binary(), #{desc => "Allow or Deny"})}
+    ];
 fields("ctx_bridge_mqtt") ->
     [ {"event_type", sc('$bridges/mqtt:*', #{desc => "Event Type", required => true})}
     , {"id", sc(binary(), #{desc => "Message ID"})}

+ 122 - 0
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -33,6 +33,8 @@
 
 -export([ on_client_connected/3
         , on_client_disconnected/4
+        , on_client_connack/4
+        , on_client_check_authz_complete/6
         , on_session_subscribed/4
         , on_session_unsubscribed/4
         , on_message_publish/2
@@ -60,6 +62,8 @@
 event_names() ->
     [ 'client.connected'
     , 'client.disconnected'
+    , 'client.connack'
+    , 'client.check_authz_complete'
     , 'session.subscribed'
     , 'session.unsubscribed'
     , 'message.publish'
@@ -108,6 +112,18 @@ on_client_connected(ClientInfo, ConnInfo, Env) ->
     apply_event('client.connected',
         fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env).
 
+on_client_connack(ConnInfo, Reason, _, Env) ->
+    apply_event('client.connack',
+                          fun() -> eventmsg_connack(ConnInfo, Reason) end, Env).
+
+on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Env) ->
+    apply_event('client.check_authz_complete',
+                          fun() -> eventmsg_check_authz_complete(ClientInfo,
+                                                               PubSub,
+                                                               Topic,
+                                                               Result,
+                                                               AuthzSource) end, Env).
+
 on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
     apply_event('client.disconnected',
         fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env).
@@ -235,6 +251,49 @@ eventmsg_disconnected(_ClientInfo = #{
           disconnected_at => DisconnectedAt
         }).
 
+eventmsg_connack(_ConnInfo = #{
+                    clientid := ClientId,
+                    clean_start := CleanStart,
+                    username := Username,
+                    peername := PeerName,
+                    sockname := SockName,
+                    proto_name := ProtoName,
+                    proto_ver := ProtoVer,
+                    keepalive := Keepalive,
+                    connected_at := ConnectedAt,
+                    conn_props := ConnProps,
+                    expiry_interval := ExpiryInterval
+                   }, Reason) ->
+    with_basic_columns('client.connack',
+        #{reason_code => reason(Reason),
+          clientid => ClientId,
+          clean_start => CleanStart,
+          username => Username,
+          peername => ntoa(PeerName),
+          sockname => ntoa(SockName),
+          proto_name => ProtoName,
+          proto_ver => ProtoVer,
+          keepalive => Keepalive,
+          expiry_interval => ExpiryInterval,
+          connected_at => ConnectedAt,
+          conn_props => printable_maps(ConnProps)
+        }).
+
+eventmsg_check_authz_complete(_ClientInfo = #{
+                                            clientid := ClientId,
+                                            username := Username,
+                                            peerhost := PeerHost
+                                           }, PubSub, Topic, Result, AuthzSource) ->
+    with_basic_columns('client.check_authz_complete',
+                       #{clientid => ClientId,
+                         username => Username,
+                         peerhost => ntoa(PeerHost),
+                         topic    => Topic,
+                         action   => PubSub,
+                         authz_source => AuthzSource,
+                         result   => Result
+                        }).
+
 eventmsg_sub_or_unsub(Event, _ClientInfo = #{
                     clientid := ClientId,
                     username := Username,
@@ -378,6 +437,8 @@ event_info() ->
     , event_info_message_dropped()
     , event_info_client_connected()
     , event_info_client_disconnected()
+    , event_info_client_connack()
+    , event_info_client_check_authz_complete()
     , event_info_session_subscribed()
     , event_info_session_unsubscribed()
     , event_info_delivery_dropped()
@@ -435,6 +496,20 @@ event_info_client_disconnected() ->
         {<<"client disconnected">>, <<"连接断开"/utf8>>},
         <<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">>
     ).
+event_info_client_connack() ->
+    event_info_common(
+      'client.connack',
+      {<<"client connack">>, <<"连接确认"/utf8>>},
+      {<<"client connack">>, <<"连接确认"/utf8>>},
+      <<"SELECT * FROM \"$events/client_connack\"">>
+     ).
+event_info_client_check_authz_complete() ->
+    event_info_common(
+      'client.check_authz_complete',
+      {<<"client check authz complete">>, <<"鉴权结果"/utf8>>},
+      {<<"client check authz complete">>, <<"鉴权结果"/utf8>>},
+      <<"SELECT * FROM \"$events/client_check_authz_complete\"">>
+     ).
 event_info_session_subscribed() ->
     event_info_common(
         'session.subscribed',
@@ -500,6 +575,18 @@ test_columns('client.disconnected') ->
     , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
     , {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]}
     ];
+test_columns('client.connack') ->
+    [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
+    , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
+    , {<<"reason_code">>, [<<"sucess">>, <<"the reason code">>]}
+    ];
+test_columns('client.check_authz_complete') ->
+    [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
+    , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
+    , {<<"topic">>,    [<<"t/1">>, <<"the topic of the MQTT message">>]}
+    , {<<"action">>,   [<<"publish">>, <<"the action of publish or subscribe">>]}
+    , {<<"result">>,   [<<"allow">>,<<"the authz check complete result">>]}
+    ];
 test_columns('session.unsubscribed') ->
     test_columns('session.subscribed');
 test_columns('session.subscribed') ->
@@ -600,6 +687,35 @@ columns_with_exam('client.disconnected') ->
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
     ];
+columns_with_exam('client.connack') ->
+    [ {<<"event">>, 'client.connected'}
+    , {<<"reason_code">>, success}
+    , {<<"clientid">>, <<"c_emqx">>}
+    , {<<"username">>, <<"u_emqx">>}
+    , {<<"peername">>, <<"192.168.0.10:56431">>}
+    , {<<"sockname">>, <<"0.0.0.0:1883">>}
+    , {<<"proto_name">>, <<"MQTT">>}
+    , {<<"proto_ver">>, 5}
+    , {<<"keepalive">>, 60}
+    , {<<"clean_start">>, true}
+    , {<<"expiry_interval">>, 3600}
+    , {<<"connected_at">>, erlang:system_time(millisecond)}
+    , columns_example_props(conn_props)
+    , {<<"timestamp">>, erlang:system_time(millisecond)}
+    , {<<"node">>, node()}
+    ];
+columns_with_exam('client.check_authz_complete') ->
+    [ {<<"event">>, 'client.check_authz_complete'}
+    , {<<"clientid">>, <<"c_emqx">>}
+    , {<<"username">>, <<"u_emqx">>}
+    , {<<"peerhost">>, <<"192.168.0.10">>}
+    , {<<"topic">>, <<"t/a">>}
+    , {<<"action">>, <<"publish">>}
+    , {<<"authz_source">>, <<"cache">>}
+    , {<<"result">>, <<"allow">>}
+    , {<<"timestamp">>, erlang:system_time(millisecond)}
+    , {<<"node">>, node()}
+    ];
 columns_with_exam('session.subscribed') ->
     [ columns_example_props(sub_props)
     ] ++ columns_message_sub_unsub('session.subscribed');
@@ -710,6 +826,9 @@ ntoa(IpAddr) ->
 
 event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
 event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
+event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack';
+event_name(<<"$events/client_check_authz_complete", _/binary>>) ->
+    'client.check_authz_complete';
 event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
 event_name(<<"$events/session_unsubscribed", _/binary>>) ->
     'session.unsubscribed';
@@ -722,6 +841,9 @@ event_name(_) -> 'message.publish'.
 
 event_topic('client.connected') -> <<"$events/client_connected">>;
 event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
+event_topic('client.connack') -> <<"$events/client_connack">>;
+event_topic('client.check_authz_complete') ->
+    <<"$events/client_check_authz_complete">>;
 event_topic('session.subscribed') -> <<"$events/session_subscribed">>;
 event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
 event_topic('message.delivered') -> <<"$events/message_delivered">>;

+ 66 - 3
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -105,13 +105,24 @@ groups() ->
 
 init_per_suite(Config) ->
     application:load(emqx_conf),
-    ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
+    ok = emqx_common_test_helpers:start_apps(
+              [emqx_conf, emqx_rule_engine, emqx_authz],
+               fun set_special_configs/1),
     Config.
 
 end_per_suite(_Config) ->
     emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
     ok.
 
+set_special_configs(emqx_authz) ->
+    {ok, _} = emqx:update_config(
+                [authorization],
+                #{<<"no_match">> => atom_to_binary(allow),
+                  <<"cache">> => #{<<"enable">> => atom_to_binary(true)},
+                  <<"sources">> => []}),
+    ok;
+set_special_configs(_) ->
+    ok.
 on_resource_create(_id, _) -> #{}.
 on_resource_destroy(_id, _) -> ok.
 on_get_resource_status(_id, _) -> #{}.
@@ -139,6 +150,8 @@ init_per_testcase(t_events, Config) ->
     init_events_counters(),
     SQL = "SELECT * FROM \"$events/client_connected\", "
                         "\"$events/client_disconnected\", "
+                        "\"$events/client_connack\", "
+                        "\"$events/client_check_authz_complete\", "
                         "\"$events/session_subscribed\", "
                         "\"$events/session_unsubscribed\", "
                         "\"$events/message_acked\", "
@@ -321,7 +334,7 @@ t_events(_Config) ->
         , {proto_ver, v5}
         , {properties, #{'Session-Expiry-Interval' => 60}}
         ]),
-    ct:pal("====== verify $events/client_connected"),
+    ct:pal("====== verify $events/client_connected, $events/client_connack"),
     client_connected(Client, Client2),
     ct:pal("====== verify $events/message_dropped"),
     message_dropped(Client),
@@ -349,6 +362,7 @@ message_publish(Client) ->
 client_connected(Client, Client2) ->
     {ok, _} = emqtt:connect(Client),
     {ok, _} = emqtt:connect(Client2),
+    verify_event('client.connack'),
     verify_event('client.connected'),
     ok.
 client_disconnected(Client, Client2) ->
@@ -359,6 +373,7 @@ client_disconnected(Client, Client2) ->
 session_subscribed(Client2) ->
     {ok, _, _} = emqtt:subscribe(Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>, 1),
     verify_event('session.subscribed'),
+    verify_event('client.check_authz_complete'),
     ok.
 session_unsubscribed(Client2) ->
     {ok, _, _} = emqtt:unsubscribe(Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>),
@@ -1638,7 +1653,55 @@ verify_event_fields('message.acked', Fields) ->
     ?assert(is_map(PubAckProps)),
     ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
     ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
-    ?assert(EventAt =< Timestamp).
+    ?assert(EventAt =< Timestamp);
+
+verify_event_fields('client.connack', Fields) ->
+    #{clientid := ClientId,
+      clean_start := CleanStart,
+      username := Username,
+      peername := PeerName,
+      sockname := SockName,
+      proto_name := ProtoName,
+      proto_ver := ProtoVer,
+      keepalive := Keepalive,
+      expiry_interval := ExpiryInterval,
+      conn_props := Properties,
+      timestamp := Timestamp,
+      connected_at := EventAt
+    } = Fields,
+    Now = erlang:system_time(millisecond),
+    TimestampElapse = Now - Timestamp,
+    RcvdAtElapse = Now - EventAt,
+    ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
+    ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
+    verify_peername(PeerName),
+    verify_peername(SockName),
+    ?assertEqual(<<"MQTT">>, ProtoName),
+    ?assertEqual(5, ProtoVer),
+    ?assert(is_integer(Keepalive)),
+    ?assert(is_boolean(CleanStart)),
+    ?assertEqual(60000, ExpiryInterval),
+    ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
+    ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
+    ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
+    ?assert(EventAt =< Timestamp);
+
+verify_event_fields('client.check_authz_complete', Fields) ->
+    #{clientid := ClientId,
+      action := Action,
+      result := Result,
+      topic := Topic,
+      authz_source := AuthzSource,
+      username := Username
+     } = Fields,
+    ?assertEqual(<<"t1">>, Topic),
+    ?assert(lists:member(Action, [subscribe, publish])),
+    ?assert(lists:member(Result, [allow, deny])),
+    ?assert(lists:member(AuthzSource, [cache, default, file,
+                                       http, mongodb, mysql, redis,
+                                       postgresql, built_in_database])),
+    ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
+    ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])).
 
 verify_peername(PeerName) ->
     case string:split(PeerName, ":") of