Преглед изворни кода

Merge pull request #6162 from HJianBo/port-44-exhook-feat

Pass message header to 'on_message_publish' hook
JianBo He пре 4 година
родитељ
комит
d551c7977d

+ 2 - 1
apps/emqx/src/emqx_types.erl

@@ -192,7 +192,8 @@
                      username   => username(),
                      peerhost   => peerhost(),
                      properties => properties(),
-                     atom()     => term()}).
+                     allow_publish => boolean(),
+                     atom() => term()}).
 
 -type(banned() :: #banned{}).
 -type(deliver() :: {deliver, topic(), message()}).

+ 6 - 0
apps/emqx_exhook/etc/emqx_exhook.conf

@@ -25,6 +25,12 @@ exhook {
     ## Value: false | Duration
     auto_reconnect = 60s
 
+    ## The process pool size for gRPC client
+    ##
+    ## Default: Equals cpu cores
+    ## Value: Integer
+    #pool_size = 16
+
     servers = [
     #    { name: "default"
     #      url: "http://127.0.0.1:9000"

+ 25 - 0
apps/emqx_exhook/priv/protos/exhook.proto

@@ -358,6 +358,31 @@ message Message {
   bytes  payload = 6;
 
   uint64 timestamp = 7;
+
+  // The key of header can be:
+  //  - username:
+  //    * Readonly
+  //    * The username of sender client
+  //    * Value type: utf8 string
+  //  - protocol:
+  //    * Readonly
+  //    * The protocol name of sender client
+  //    * Value type: string enum with "mqtt", "mqtt-sn", ...
+  //  - peerhost:
+  //    * Readonly
+  //    * The peerhost of sender client
+  //    * Value type: ip address string
+  //  - allow_publish:
+  //    * Writable
+  //    * Whether to allow the message to be published by emqx
+  //    * Value type: string enum with "true", "false", default is "true"
+  //
+  // Notes: All header may be missing, which means that the message does not
+  //   carry these headers. We can guarantee that clients coming from MQTT,
+  //   MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
+  //   carry these headers, but there is no guarantee that messages published
+  //   by other means will do, e.g. messages published by HTTP-API
+  map<string, string> headers = 8;
 }
 
 message Property {

+ 1 - 1
apps/emqx_exhook/rebar.config

@@ -5,7 +5,7 @@
 ]}.
 
 {deps,
- [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}}
+ [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
 ]}.
 
 {grpc,

+ 54 - 15
apps/emqx_exhook/src/emqx_exhook_handler.erl

@@ -49,6 +49,7 @@
 
 %% Utils
 -export([ message/1
+        , headers/1
         , stringfy/1
         , merge_responsed_bool/2
         , merge_responsed_message/2
@@ -61,6 +62,8 @@
         , call_fold/3
         ]).
 
+-elvis([{elvis_style, god_modules, disable}]).
+
 %%--------------------------------------------------------------------
 %% Clients
 %%--------------------------------------------------------------------
@@ -257,17 +260,58 @@ clientinfo(ClientInfo =
       cn => maybe(maps:get(cn, ClientInfo, undefined)),
       dn => maybe(maps:get(dn, ClientInfo, undefined))}.
 
-message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) ->
+message(#message{id = Id, qos = Qos, from = From, topic = Topic,
+                 payload = Payload, timestamp = Ts, headers = Headers}) ->
     #{node => stringfy(node()),
       id => emqx_guid:to_hexstr(Id),
       qos => Qos,
       from => stringfy(From),
       topic => Topic,
       payload => Payload,
-      timestamp => Ts}.
-
-assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) ->
-    Message#message{qos = Qos, topic = Topic, payload = Payload}.
+      timestamp => Ts,
+      headers => headers(Headers)
+     }.
+
+headers(Headers) ->
+    Ls = [username, protocol, peerhost, allow_publish],
+    maps:fold(
+      fun
+          (_, undefined, Acc) ->
+              Acc; %% Ignore undefined value
+          (K, V, Acc) ->
+              case lists:member(K, Ls) of
+                  true ->
+                      Acc#{atom_to_binary(K) => bin(K, V)};
+                  _ ->
+                      Acc
+              end
+    end, #{}, Headers).
+
+bin(K, V) when K == username;
+               K == protocol;
+               K == allow_publish ->
+    bin(V);
+bin(peerhost, V) ->
+    bin(inet:ntoa(V)).
+
+bin(V) when is_binary(V) -> V;
+bin(V) when is_atom(V) -> atom_to_binary(V);
+bin(V) when is_list(V) -> iolist_to_binary(V).
+
+assign_to_message(InMessage = #{qos := Qos, topic := Topic,
+                                payload := Payload}, Message) ->
+    NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
+    enrich_header(maps:get(headers, InMessage, #{}), NMsg).
+
+enrich_header(Headers, Message) ->
+    case maps:get(<<"allow_publish">>, Headers, undefined) of
+        <<"false">> ->
+            emqx_message:set_header(allow_publish, false, Message);
+        <<"true">> ->
+            emqx_message:set_header(allow_publish, true, Message);
+        _ ->
+            Message
+    end.
 
 topicfilters(Tfs) when is_list(Tfs) ->
     [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
@@ -298,11 +342,7 @@ merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
     ignore;
 merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
   when is_boolean(NewBool) ->
-    NReq = Req#{result => NewBool},
-    case Type of
-        'CONTINUE' -> {ok, NReq};
-        'STOP_AND_RETURN' -> {stop, NReq}
-    end;
+    {ret(Type), Req#{result => NewBool}};
 merge_responsed_bool(_Req, Resp) ->
     ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
     ignore.
@@ -310,11 +350,10 @@ merge_responsed_bool(_Req, Resp) ->
 merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
     ignore;
 merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
-    NReq = Req#{message => NMessage},
-    case Type of
-        'CONTINUE' -> {ok, NReq};
-        'STOP_AND_RETURN' -> {stop, NReq}
-    end;
+    {ret(Type), Req#{message => NMessage}};
 merge_responsed_message(_Req, Resp) ->
     ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
     ignore.
+
+ret('CONTINUE') -> ok;
+ret('STOP_AND_RETURN') -> stop.

+ 14 - 1
apps/emqx_exhook/src/emqx_exhook_mngr.erl

@@ -36,6 +36,8 @@
         , server/1
         , put_request_failed_action/1
         , get_request_failed_action/0
+        , put_pool_size/1
+        , get_pool_size/0
         ]).
 
 %% gen_server callbacks
@@ -117,6 +119,9 @@ init([Servers, AutoReconnect, ReqOpts0]) ->
     put_request_failed_action(
       maps:get(request_failed_action, ReqOpts0, deny)
      ),
+    put_pool_size(
+      maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers))
+     ),
 
     %% Load the hook servers
     ReqOpts = maps:without([request_failed_action], ReqOpts0),
@@ -136,7 +141,7 @@ load_all_servers(Servers, ReqOpts) ->
     load_all_servers(Servers, ReqOpts, #{}, #{}).
 load_all_servers([], _Request, Waiting, Running) ->
     {Waiting, Running};
-load_all_servers([#{name := Name0} = Options0|More], ReqOpts, Waiting, Running) ->
+load_all_servers([#{name := Name0} = Options0 | More], ReqOpts, Waiting, Running) ->
     Name = iolist_to_binary(Name0),
     Options = Options0#{name => Name},
     {NWaiting, NRunning} =
@@ -291,6 +296,14 @@ put_request_failed_action(Val) ->
 get_request_failed_action() ->
     persistent_term:get({?APP, request_failed_action}).
 
+put_pool_size(Val) ->
+    persistent_term:put({?APP, pool_size}, Val).
+
+get_pool_size() ->
+    %% Avoid the scenario that the parameter is not set after
+    %% the hot upgrade completed.
+    persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)).
+
 save(Name, ServerState) ->
     Saved = persistent_term:get(?APP, []),
     persistent_term:put(?APP, lists:reverse([Name | Saved])),

+ 4 - 0
apps/emqx_exhook/src/emqx_exhook_schema.erl

@@ -49,6 +49,10 @@ fields(exhook) ->
        sc(hoconsc:union([false, duration()]),
           #{ default => "60s"
            })}
+    , {pool_size,
+       sc(integer(),
+          #{ nullable => true
+           })}
     , {servers,
        sc(hoconsc:array(ref(servers)),
           #{default => []})}

+ 24 - 14
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -75,6 +75,8 @@
 
 -dialyzer({nowarn_function, [inc_metrics/2]}).
 
+-elvis([{elvis_style, dont_repeat_yourself, disable}]).
+
 %%--------------------------------------------------------------------
 %% Load/Unload APIs
 %%--------------------------------------------------------------------
@@ -108,9 +110,10 @@ load(Name, Opts0, ReqOpts) ->
 
 %% @private
 channel_opts(Opts = #{url := URL}) ->
+    ClientOpts = #{pool_size => emqx_exhook_mngr:get_pool_size()},
     case uri_string:parse(URL) of
         #{scheme := "http", host := Host, port := Port} ->
-            {format_http_uri("http", Host, Port), #{}};
+            {format_http_uri("http", Host, Port), ClientOpts};
         #{scheme := "https", host := Host, port := Port} ->
             SslOpts =
                 case maps:get(ssl, Opts, undefined) of
@@ -122,8 +125,12 @@ channel_opts(Opts = #{url := URL}) ->
                            {keyfile, maps:get(keyfile, MapOpts, undefined)}
                           ])
                 end,
-            {format_http_uri("https", Host, Port),
-             #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}};
+            NClientOpts = ClientOpts#{
+                            gun_opts =>
+                              #{transport => ssl,
+                                transport_opts => SslOpts}
+                           },
+            {format_http_uri("https", Host, Port), NClientOpts};
         _ ->
             error(bad_server_url)
     end.
@@ -173,16 +180,19 @@ resolve_hookspec(HookSpecs) when is_list(HookSpecs) ->
         case maps:get(name, HookSpec, undefined) of
             undefined -> Acc;
             Name0 ->
-                Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end,
-                case lists:member(Name, AvailableHooks) of
-                    true ->
-                        case lists:member(Name, MessageHooks) of
-                            true ->
-                                Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}};
-                            _ ->
-                                Acc#{Name => #{}}
-                        end;
-                    _ -> error({unknown_hookpoint, Name})
+                Name = try
+                           binary_to_existing_atom(Name0, utf8)
+                       catch T:R:_ -> {T,R}
+                       end,
+                case {lists:member(Name, AvailableHooks),
+                      lists:member(Name, MessageHooks)} of
+                    {false, _} ->
+                        error({unknown_hookpoint, Name});
+                    {true, false} ->
+                        Acc#{Name => #{}};
+                    {true, true} ->
+                        Acc#{Name => #{
+                              topics => maps:get(topics, HookSpec, [])}}
                 end
         end
     end, #{}, HookSpecs).
@@ -255,7 +265,7 @@ call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
 %% @private
 inc_metrics(IncFun, Name) when is_function(IncFun) ->
     %% BACKW: e4.2.0-e4.2.2
-    {env, [Prefix|_]} = erlang:fun_info(IncFun, env),
+    {env, [Prefix | _]} = erlang:fun_info(IncFun, env),
     inc_metrics(Prefix, Name);
 inc_metrics(Prefix, Name) when is_list(Prefix) ->
     emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).

+ 3 - 2
apps/emqx_exhook/src/emqx_exhook_sup.erl

@@ -54,7 +54,8 @@ auto_reconnect() ->
 
 request_options() ->
     #{timeout => env(request_timeout, 5000),
-      request_failed_action => env(request_failed_action, deny)
+      request_failed_action => env(request_failed_action, deny),
+      pool_size => env(pool_size, erlang:system_info(schedulers))
      }.
 
 env(Key, Def) ->
@@ -67,7 +68,7 @@ env(Key, Def) ->
 -spec start_grpc_client_channel(
         binary(),
         uri_string:uri_string(),
-        grpc_client:options()) -> {ok, pid()} | {error, term()}.
+        grpc_client_sup:options()) -> {ok, pid()} | {error, term()}.
 start_grpc_client_channel(Name, SvrAddr, Options) ->
     grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).
 

+ 14 - 4
apps/emqx_exhook/test/emqx_exhook_demo_svr.erl

@@ -299,21 +299,31 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
     %% some cases for testing
     case From of
         <<"baduser">> ->
-            NMsg = Msg#{qos => 0,
+            NMsg = deny(Msg#{qos => 0,
                         topic => <<"">>,
                         payload => <<"">>
-                       },
+                       }),
             {ok, #{type => 'STOP_AND_RETURN',
                    value => {message, NMsg}}, Md};
         <<"gooduser">> ->
-            NMsg = Msg#{topic => From,
-                        payload => From},
+            NMsg = allow(Msg#{topic => From,
+                              payload => From}),
             {ok, #{type => 'STOP_AND_RETURN',
                    value => {message, NMsg}}, Md};
         _ ->
             {ok, #{type => 'IGNORE'}, Md}
     end.
 
+deny(Msg) ->
+    NHeader = maps:put(<<"allow_publish">>, <<"false">>,
+                       maps:get(headers, Msg, #{})),
+    maps:put(headers, NHeader, Msg).
+
+allow(Msg) ->
+    NHeader = maps:put(<<"allow_publish">>, <<"true">>,
+                       maps:get(headers, Msg, #{})),
+    maps:put(headers, NHeader, Msg).
+
 -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
     -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
      | {error, grpc_cowboy_h:error_response()}.

+ 13 - 6
apps/emqx_exhook/test/props/prop_exhook_hooks.erl

@@ -296,19 +296,24 @@ prop_message_publish() ->
                 _ ->
                     ExpectedOutMsg = case emqx_message:from(Msg) of
                                          <<"baduser">> ->
-                                             MsgMap = emqx_message:to_map(Msg),
+                                             MsgMap = #{headers := Headers}
+                                                    = emqx_message:to_map(Msg),
                                              emqx_message:from_map(
                                                MsgMap#{qos => 0,
                                                        topic => <<"">>,
-                                                       payload => <<"">>
+                                                       payload => <<"">>,
+                                                       headers => maps:put(allow_publish, false, Headers)
                                                       });
                                          <<"gooduser">> = From ->
-                                             MsgMap = emqx_message:to_map(Msg),
+                                             MsgMap = #{headers := Headers}
+                                                    = emqx_message:to_map(Msg),
                                              emqx_message:from_map(
                                                MsgMap#{topic => From,
-                                                       payload => From
+                                                       payload => From,
+                                                       headers => maps:put(allow_publish, true, Headers)
                                                       });
-                                         _ -> Msg
+                                         _ ->
+                                             Msg
                                      end,
                     ?assertEqual(ExpectedOutMsg, OutMsg),
 
@@ -461,7 +466,9 @@ from_message(Msg) ->
       from => stringfy(emqx_message:from(Msg)),
       topic => emqx_message:topic(Msg),
       payload => emqx_message:payload(Msg),
-      timestamp => emqx_message:timestamp(Msg)
+      timestamp => emqx_message:timestamp(Msg),
+      headers => emqx_exhook_handler:headers(
+                  emqx_message:get_headers(Msg))
      }.
 
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_gateway/rebar.config

@@ -1,6 +1,6 @@
 {erl_opts, [debug_info]}.
 {deps, [
-  {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}}
+  {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
 ]}.
 
 {plugins, [