Procházet zdrojové kódy

feat(exhook): expose headers for on_messages_publish hook

JianBo He před 4 roky
rodič
revize
20ba9d285f

+ 2 - 1
apps/emqx/src/emqx_types.erl

@@ -192,7 +192,8 @@
                      username   => username(),
                      peerhost   => peerhost(),
                      properties => properties(),
-                     atom()     => term()}).
+                     allow_publish => boolean(),
+                     atom() => term()}).
 
 -type(banned() :: #banned{}).
 -type(deliver() :: {deliver, topic(), message()}).

+ 25 - 0
apps/emqx_exhook/priv/protos/exhook.proto

@@ -358,6 +358,31 @@ message Message {
   bytes  payload = 6;
 
   uint64 timestamp = 7;
+
+  // The key of header can be:
+  //  - username:
+  //    * Readonly
+  //    * The username of sender client
+  //    * Value type: utf8 string
+  //  - protocol:
+  //    * Readonly
+  //    * The protocol name of sender client
+  //    * Value type: string enum with "mqtt", "mqtt-sn", ...
+  //  - peerhost:
+  //    * Readonly
+  //    * The peerhost of sender client
+  //    * Value type: ip address string
+  //  - allow_publish:
+  //    * Writable
+  //    * Whether to allow the message to be published by emqx
+  //    * Value type: string enum with "true", "false", default is "true"
+  //
+  // Notes: All header may be missing, which means that the message does not
+  //   carry these headers. We can guarantee that clients coming from MQTT,
+  //   MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
+  //   carry these headers, but there is no guarantee that messages published
+  //   by other means will do, e.g. messages published by HTTP-API
+  map<string, string> headers = 8;
 }
 
 message Property {

+ 45 - 5
apps/emqx_exhook/src/emqx_exhook_handler.erl

@@ -257,17 +257,57 @@ clientinfo(ClientInfo =
       cn => maybe(maps:get(cn, ClientInfo, undefined)),
       dn => maybe(maps:get(dn, ClientInfo, undefined))}.
 
-message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) ->
+message(#message{id = Id, qos = Qos, from = From, topic = Topic,
+                 payload = Payload, timestamp = Ts, headers = Headers}) ->
     #{node => stringfy(node()),
       id => emqx_guid:to_hexstr(Id),
       qos => Qos,
       from => stringfy(From),
       topic => Topic,
       payload => Payload,
-      timestamp => Ts}.
-
-assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) ->
-    Message#message{qos = Qos, topic = Topic, payload = Payload}.
+      timestamp => Ts,
+      headers => headers(Headers)
+     }.
+
+headers(undefined) ->
+    #{};
+headers(Headers) ->
+    Ls = [username, protocol, peerhost, allow_publish],
+    maps:fold(
+      fun
+          (_, undefined, Acc) ->
+              Acc; %% Ignore undefined value
+          (K, V, Acc) ->
+              case lists:member(K, Ls) of
+                  true ->
+                      Acc#{atom_to_binary(K) => bin(K, V)};
+                  _ ->
+                      Acc
+              end
+    end, #{}, Headers).
+
+bin(K, V) when K == username;
+               K == protocol;
+               K == allow_publish ->
+    bin(V);
+bin(peerhost, V) ->
+    bin(inet:ntoa(V)).
+
+bin(V) when is_binary(V) -> V;
+bin(V) when is_atom(V) -> atom_to_binary(V);
+bin(V) when is_list(V) -> iolist_to_binary(V).
+
+assign_to_message(InMessage = #{qos := Qos, topic := Topic,
+                                payload := Payload}, Message) ->
+    NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
+    enrich_header(maps:get(headers, InMessage, #{}), NMsg).
+
+enrich_header(Headers, Message) ->
+    AllowPub = case maps:get(<<"allow_publish">>, Headers, <<"true">>) of
+                   <<"false">> -> false;
+                   _ -> true
+               end,
+    emqx_message:set_header(allow_publish, AllowPub, Message).
 
 topicfilters(Tfs) when is_list(Tfs) ->
     [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].

+ 8 - 3
apps/emqx_exhook/test/emqx_exhook_demo_svr.erl

@@ -295,14 +295,14 @@ on_session_terminated(Req, Md) ->
      | {error, grpc_cowboy_h:error_response()}.
 on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
     ?MODULE:in({?FUNCTION_NAME, Req}),
-    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    io:format(standard_error, "fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
     %% some cases for testing
     case From of
         <<"baduser">> ->
-            NMsg = Msg#{qos => 0,
+            NMsg = deny(Msg#{qos => 0,
                         topic => <<"">>,
                         payload => <<"">>
-                       },
+                       }),
             {ok, #{type => 'STOP_AND_RETURN',
                    value => {message, NMsg}}, Md};
         <<"gooduser">> ->
@@ -314,6 +314,11 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
             {ok, #{type => 'IGNORE'}, Md}
     end.
 
+deny(Msg) ->
+    NHeader = maps:put(<<"allow_publish">>, <<"false">>,
+                       maps:get(headers, Msg, #{})),
+    maps:put(headers, NHeader, Msg).
+
 -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
     -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
      | {error, grpc_cowboy_h:error_response()}.