瀏覽代碼

refactor: use static function references

Zaiming (Stone) Shi 3 年之前
父節點
當前提交
cca3421308

+ 49 - 73
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -65,10 +65,7 @@ fields("rule_info") ->
     ] ++ fields("rule_creation");
 %% TODO: we can delete this API if the Dashboard not depends on it
 fields("rule_events") ->
-    ETopics = [
-        binary_to_atom(emqx_rule_events:event_topic(E))
-     || E <- emqx_rule_events:event_names()
-    ],
+    ETopics = emqx_rule_events:event_topics_enum(),
     [
         {"event", sc(hoconsc:enum(ETopics), #{desc => ?DESC("rs_event"), required => true})},
         {"title", sc(binary(), #{desc => ?DESC("rs_title"), example => "some title"})},
@@ -150,77 +147,43 @@ fields("node_metrics") ->
         fields("metrics");
 fields("ctx_pub") ->
     [
-        {"event_type", sc(message_publish, #{desc => ?DESC("event_event_type"), required => true})},
-        {"id", sc(binary(), #{desc => ?DESC("event_id")})},
-        {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
-        {"username", sc(binary(), #{desc => ?DESC("event_username")})},
-        {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
-        {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
-        {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
-        {"publish_received_at",
-            sc(integer(), #{
-                desc => ?DESC("event_publish_received_at")
-            })}
-    ] ++ [qos()];
+        {"event_type", event_type_sc(message_publish)},
+        {"id", sc(binary(), #{desc => ?DESC("event_id")})}
+        | msg_event_common_fields()
+    ];
 fields("ctx_sub") ->
     [
-        {"event_type",
-            sc(session_subscribed, #{desc => ?DESC("event_event_type"), required => true})},
-        {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
-        {"username", sc(binary(), #{desc => ?DESC("event_username")})},
-        {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
-        {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
-        {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
-        {"publish_received_at",
-            sc(integer(), #{
-                desc => ?DESC("event_publish_received_at")
-            })}
-    ] ++ [qos()];
+        {"event_type", event_type_sc(session_subscribed)}
+        | msg_event_common_fields()
+    ];
 fields("ctx_unsub") ->
     [
-        {"event_type",
-            sc(session_unsubscribed, #{desc => ?DESC("event_event_type"), required => true})}
-    ] ++
-        proplists:delete("event_type", fields("ctx_sub"));
+        {"event_type", event_type_sc(session_unsubscribed)}
+        | proplists:delete("event_type", fields("ctx_sub"))
+    ];
 fields("ctx_delivered") ->
     [
-        {"event_type",
-            sc(message_delivered, #{desc => ?DESC("event_event_type"), required => true})},
+        {"event_type", event_type_sc(message_delivered)},
         {"id", sc(binary(), #{desc => ?DESC("event_id")})},
         {"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
-        {"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})},
-        {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
-        {"username", sc(binary(), #{desc => ?DESC("event_username")})},
-        {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
-        {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
-        {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
-        {"publish_received_at",
-            sc(integer(), #{
-                desc => ?DESC("event_publish_received_at")
-            })}
-    ] ++ [qos()];
+        {"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
+        | msg_event_common_fields()
+    ];
 fields("ctx_acked") ->
-    [{"event_type", sc(message_acked, #{desc => ?DESC("event_event_type"), required => true})}] ++
-        proplists:delete("event_type", fields("ctx_delivered"));
+    [
+        {"event_type", event_type_sc(message_acked)}
+        | proplists:delete("event_type", fields("ctx_delivered"))
+    ];
 fields("ctx_dropped") ->
     [
-        {"event_type", sc(message_dropped, #{desc => ?DESC("event_event_type"), required => true})},
+        {"event_type", event_type_sc(message_dropped)},
         {"id", sc(binary(), #{desc => ?DESC("event_id")})},
-        {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})},
-        {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
-        {"username", sc(binary(), #{desc => ?DESC("event_username")})},
-        {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
-        {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
-        {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
-        {"publish_received_at",
-            sc(integer(), #{
-                desc => ?DESC("event_publish_received_at")
-            })}
-    ] ++ [qos()];
+        {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}
+        | msg_event_common_fields()
+    ];
 fields("ctx_connected") ->
     [
-        {"event_type",
-            sc(client_connected, #{desc => ?DESC("event_event_type"), required => true})},
+        {"event_type", event_type_sc(client_connected)},
         {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
         {"username", sc(binary(), #{desc => ?DESC("event_username")})},
         {"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})},
@@ -239,8 +202,7 @@ fields("ctx_connected") ->
     ];
 fields("ctx_disconnected") ->
     [
-        {"event_type",
-            sc(client_disconnected, #{desc => ?DESC("event_event_type"), required => true})},
+        {"event_type", event_type_sc(client_disconnected)},
         {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
         {"username", sc(binary(), #{desc => ?DESC("event_username")})},
         {"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})},
@@ -253,7 +215,7 @@ fields("ctx_disconnected") ->
     ];
 fields("ctx_connack") ->
     [
-        {"event_type", sc(client_connack, #{desc => ?DESC("event_event_type"), required => true})},
+        {"event_type", event_type_sc(client_connack)},
         {"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})},
         {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
         {"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
@@ -271,8 +233,7 @@ fields("ctx_connack") ->
     ];
 fields("ctx_check_authz_complete") ->
     [
-        {"event_type",
-            sc(client_check_authz_complete, #{desc => ?DESC("event_event_type"), required => true})},
+        {"event_type", event_type_sc(client_check_authz_complete)},
         {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
         {"username", sc(binary(), #{desc => ?DESC("event_username")})},
         {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
@@ -283,19 +244,16 @@ fields("ctx_check_authz_complete") ->
     ];
 fields("ctx_bridge_mqtt") ->
     [
-        {"event_type",
-            sc('$bridges/mqtt:*', #{desc => ?DESC("event_event_type"), required => true})},
+        {"event_type", event_type_sc('$bridges/mqtt:*')},
         {"id", sc(binary(), #{desc => ?DESC("event_id")})},
         {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
         {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
         {"server", sc(binary(), #{desc => ?DESC("event_server")})},
         {"dup", sc(binary(), #{desc => ?DESC("event_dup")})},
         {"retain", sc(binary(), #{desc => ?DESC("event_retain")})},
-        {"message_received_at",
-            sc(integer(), #{
-                desc => ?DESC("event_publish_received_at")
-            })}
-    ] ++ [qos()].
+        {"message_received_at", publish_received_at_sc()},
+        qos()
+    ].
 
 qos() ->
     {"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}.
@@ -312,4 +270,22 @@ rule_id() ->
         )}.
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+
 ref(Field) -> hoconsc:ref(?MODULE, Field).
+
+event_type_sc(Event) ->
+    sc(Event, #{desc => ?DESC("event_event_type"), required => true}).
+
+publish_received_at_sc() ->
+    sc(integer(), #{desc => ?DESC("event_publish_received_at")}).
+
+msg_event_common_fields() ->
+    [
+        {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
+        {"username", sc(binary(), #{desc => ?DESC("event_username")})},
+        {"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
+        {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
+        {"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
+        {"publish_received_at", publish_received_at_sc()},
+        qos()
+    ].

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -2,7 +2,7 @@
 {application, emqx_rule_engine, [
     {description, "EMQX Rule Engine"},
     % strict semver, bump manually!
-    {vsn, "5.0.4"},
+    {vsn, "5.0.5"},
     {modules, []},
     {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
     {applications, [kernel, stdlib, rulesql, getopt]},

+ 53 - 25
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -28,6 +28,7 @@
     unload/1,
     event_names/0,
     event_name/1,
+    event_topics_enum/0,
     event_topic/1,
     eventmsg_publish/1
 ]).
@@ -78,6 +79,22 @@ event_names() ->
         'delivery.dropped'
     ].
 
+%% for documentation purposes
+event_topics_enum() ->
+    [
+        '$events/client_connected',
+        '$events/client_disconnected',
+        '$events/client_connack',
+        '$events/client_check_authz_complete',
+        '$events/session_subscribed',
+        '$events/session_unsubscribed',
+        '$events/message_delivered',
+        '$events/message_acked',
+        '$events/message_dropped',
+        '$events/delivery_dropped'
+        % '$events/message_publish' % not possible to use in SELECT FROM
+    ].
+
 reload() ->
     lists:foreach(
         fun(Rule) ->
@@ -88,21 +105,22 @@ reload() ->
 
 load(Topic) ->
     HookPoint = event_name(Topic),
+    HookFun = hook_fun_name(HookPoint),
     emqx_hooks:put(
-        HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
+        HookPoint, {?MODULE, HookFun, [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
     ).
 
 unload() ->
     lists:foreach(
         fun(HookPoint) ->
-            emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
+            emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)})
         end,
         event_names()
     ).
 
 unload(Topic) ->
     HookPoint = event_name(Topic),
-    emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}).
+    emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)}).
 
 %%--------------------------------------------------------------------
 %% Callbacks
@@ -987,15 +1005,25 @@ columns_example_props_specific(unsub_props) ->
 %% Helper functions
 %%--------------------------------------------------------------------
 
-hook_fun(<<"$bridges/", _/binary>>) ->
-    on_bridge_message_received;
-hook_fun(Event) ->
-    case string:split(atom_to_list(Event), ".") of
-        [Prefix, Name] ->
-            list_to_atom(lists:append(["on_", Prefix, "_", Name]));
-        [_] ->
-            error(invalid_event, Event)
-    end.
+hook_fun_name(HookPoint) ->
+    HookFun = hook_fun(HookPoint),
+    {name, HookFunName} = erlang:fun_info(HookFun, name),
+    HookFunName.
+
+%% return static function references to help static code checks
+hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2;
+hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3;
+hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4;
+hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4;
+hook_fun('client.check_authz_complete') -> fun ?MODULE:on_client_check_authz_complete/6;
+hook_fun('session.subscribed') -> fun ?MODULE:on_session_subscribed/4;
+hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
+hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
+hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
+hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
+hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
+hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
+hook_fun(Event) -> error({invalid_event, Event}).
 
 reason(Reason) when is_atom(Reason) -> Reason;
 reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
@@ -1006,19 +1034,20 @@ ntoa(undefined) -> undefined;
 ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
 ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
 
-event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
-event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
-event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack';
-event_name(<<"$events/client_check_authz_complete", _/binary>>) -> 'client.check_authz_complete';
-event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
-event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed';
-event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
-event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
-event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
-event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
-event_name(<<"$bridges/", _/binary>> = Topic) -> Topic;
+event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
+event_name(<<"$events/client_connected">>) -> 'client.connected';
+event_name(<<"$events/client_disconnected">>) -> 'client.disconnected';
+event_name(<<"$events/client_connack">>) -> 'client.connack';
+event_name(<<"$events/client_check_authz_complete">>) -> 'client.check_authz_complete';
+event_name(<<"$events/session_subscribed">>) -> 'session.subscribed';
+event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
+event_name(<<"$events/message_delivered">>) -> 'message.delivered';
+event_name(<<"$events/message_acked">>) -> 'message.acked';
+event_name(<<"$events/message_dropped">>) -> 'message.dropped';
+event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
 event_name(_) -> 'message.publish'.
 
+event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
 event_topic('client.connected') -> <<"$events/client_connected">>;
 event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
 event_topic('client.connack') -> <<"$events/client_connack">>;
@@ -1029,8 +1058,7 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>;
 event_topic('message.acked') -> <<"$events/message_acked">>;
 event_topic('message.dropped') -> <<"$events/message_dropped">>;
 event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
-event_topic('message.publish') -> <<"$events/message_publish">>;
-event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic.
+event_topic('message.publish') -> <<"$events/message_publish">>.
 
 printable_maps(undefined) ->
     #{};

+ 33 - 13
apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl

@@ -8,19 +8,18 @@
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 t_mod_hook_fun(_) ->
-    Funcs = emqx_rule_events:module_info(exports),
-    [
-        ?assert(lists:keymember(emqx_rule_events:hook_fun(Event), 1, Funcs))
-     || Event <- [
-            'client.connected',
-            'client.disconnected',
-            'session.subscribed',
-            'session.unsubscribed',
-            'message.acked',
-            'message.dropped',
-            'message.delivered'
-        ]
-    ].
+    Events = emqx_rule_events:event_names(),
+    lists:foreach(
+        fun(E) ->
+            ?assert(is_function(emqx_rule_events:hook_fun(E)))
+        end,
+        Events
+    ),
+    ?assertEqual(
+        fun emqx_rule_events:on_bridge_message_received/2,
+        emqx_rule_events:hook_fun(<<"$bridges/foo">>)
+    ),
+    ?assertError({invalid_event, foo}, emqx_rule_events:hook_fun(foo)).
 
 t_printable_maps(_) ->
     Headers = #{
@@ -42,3 +41,24 @@ t_printable_maps(_) ->
     ?assertNot(maps:is_key(redispatch_to, Converted)),
     ?assertNot(maps:is_key(shared_dispatch_ack, Converted)),
     ok.
+
+t_event_name_topic_conversion(_) ->
+    Events = emqx_rule_events:event_names() -- ['message.publish'],
+    Topics = [atom_to_binary(A) || A <- emqx_rule_events:event_topics_enum()],
+    Zip = lists:zip(Events, Topics),
+    lists:foreach(
+        fun({Event, Topic}) ->
+            ?assertEqual(Event, emqx_rule_events:event_name(Topic)),
+            ?assertEqual(Topic, emqx_rule_events:event_topic(Event))
+        end,
+        Zip
+    ).
+
+t_special_events_name_topic_conversion(_) ->
+    Bridge = <<"$bridges/foo:bar">>,
+    AdHoc = <<"foo/bar">>,
+    NonExisting = <<"$events/message_publish">>,
+    ?assertEqual(Bridge, emqx_rule_events:event_name(Bridge)),
+    ?assertEqual('message.publish', emqx_rule_events:event_name(AdHoc)),
+    ?assertEqual('message.publish', emqx_rule_events:event_name(NonExisting)),
+    ?assertEqual(NonExisting, emqx_rule_events:event_topic('message.publish')).