Преглед изворни кода

Merge pull request #6142 from HJianBo/put-message-headers

Fill the message headers
Zaiming (Stone) Shi пре 4 година
родитељ
комит
5f8d9db64b

+ 1 - 1
apps/emqx_coap/src/emqx_coap.app.src

@@ -1,6 +1,6 @@
 {application, emqx_coap,
  [{description, "EMQ X CoAP Gateway"},
-  {vsn, "4.3.0"}, % strict semver, bump manually!
+  {vsn, "4.3.1"}, % strict semver, bump manually!
   {modules, []},
   {registered, []},
   {applications, [kernel,stdlib,gen_coap]},

+ 9 - 0
apps/emqx_coap/src/emqx_coap.appup.src

@@ -0,0 +1,9 @@
+%% -*-: erlang -*-
+{VSN,
+  [{"4.3.0",[
+    {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
+   {<<".*">>, []}],
+  [{"4.3.0",[
+    {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
+   {<<".*">>, []}]
+}.

+ 19 - 7
apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl

@@ -58,6 +58,8 @@
 
 -define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}).
 
+-define(PROTO_VER, 1).
+
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
@@ -139,7 +141,7 @@ handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicL
     NewTopics = proplists:delete(Topic, TopicList),
     IsWild = emqx_topic:wildcard(Topic),
     {reply, chann_subscribe(Topic, State), State#state{sub_topics =
-        [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
+        [{Topic, {IsWild, CoapPid}} | NewTopics]}, hibernate};
 
 handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
     NewTopics = proplists:delete(Topic, TopicList),
@@ -244,15 +246,26 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
     case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of
         allow ->
             _ = emqx_broker:publish(
-                    emqx_message:set_flag(retain, false,
-                        emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
-            ok;
+                  packet_to_message(Topic, Payload, State)), ok;
         deny  ->
             ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
                  [Topic, ClientId]),
             {error, forbidden}
     end.
 
+packet_to_message(Topic, Payload,
+                  #state{clientid = ClientId,
+                         username = Username,
+                         peername = {PeerHost, _}}) ->
+    Message = emqx_message:set_flag(
+                retain, false,
+                emqx_message:make(ClientId, ?QOS_0, Topic, Payload)
+               ),
+    emqx_message:set_headers(
+      #{ proto_ver => ?PROTO_VER
+       , protocol => coap
+       , username => Username
+       , peerhost => PeerHost}, Message).
 
 %%--------------------------------------------------------------------
 %% Deliver
@@ -270,7 +283,7 @@ do_deliver({Topic, Payload}, Subscribers) ->
 
 deliver_to_coap(_TopicName, _Payload, []) ->
     ok;
-deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) ->
+deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
     Matched =   case IsWild of
                     true  -> emqx_topic:match(TopicName, TopicFilter);
                     false -> TopicName =:= TopicFilter
@@ -324,7 +337,7 @@ conninfo(#state{peername = Peername,
       peercert => nossl,        %% TODO: dtls
       conn_mod => ?MODULE,
       proto_name => <<"CoAP">>,
-      proto_ver => 1,
+      proto_ver => ?PROTO_VER,
       clean_start => true,
       clientid => ClientId,
       username => undefined,
@@ -384,4 +397,3 @@ clientinfo(#state{peername = {PeerHost, _},
       mountpoint => undefined,
       ws_cookie  => undefined
      }.
-

+ 1 - 1
apps/emqx_exproto/src/emqx_exproto.app.src

@@ -1,6 +1,6 @@
 {application, emqx_exproto,
  [{description, "EMQ X Extension for Protocol"},
-  {vsn, "4.3.4"}, %% 4.3.3 is used by ee
+  {vsn, "4.3.5"}, %% 4.3.3 is used by ee
   {modules, []},
   {registered, []},
   {mod, {emqx_exproto_app, []}},

+ 6 - 2
apps/emqx_exproto/src/emqx_exproto.appup.src

@@ -1,6 +1,8 @@
 %% -*- mode: erlang -*-
 {VSN,
-  [{"4.3.3",
+  [{"4.3.4",
+    [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {"4.3.3",
     [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
      {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
    {"4.3.2",
@@ -12,7 +14,9 @@
      {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
      {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
    {<<".*">>,[]}],
-  [{"4.3.3",
+  [{"4.3.4",
+    [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
+   {"4.3.3",
     [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
      {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
    {"4.3.2",

+ 21 - 6
apps/emqx_exproto/src/emqx_exproto_channel.erl

@@ -340,17 +340,14 @@ handle_call({unsubscribe, TopicFilter},
 handle_call({publish, Topic, Qos, Payload},
             Channel = #channel{
                          conn_state = connected,
-                         clientinfo = ClientInfo
-                                    = #{clientid := From,
-                                        mountpoint := Mountpoint}}) ->
+                         clientinfo = ClientInfo}) ->
     case is_acl_enabled(ClientInfo) andalso
          emqx_access_control:check_acl(ClientInfo, publish, Topic) of
         deny ->
             {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
         _ ->
-            Msg = emqx_message:make(From, Qos, Topic, Payload),
-            NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
-            _ = emqx:publish(NMsg),
+            Msg = packet_to_message(Topic, Qos, Payload, Channel),
+            _ = emqx:publish(Msg),
             {reply, ok, Channel}
     end;
 
@@ -419,6 +416,24 @@ is_anonymous(_AuthResult)          -> false.
 clean_anonymous_clients() ->
     ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
 
+packet_to_message(Topic, Qos, Payload,
+                  #channel{
+                     conninfo = #{proto_ver := ProtoVer},
+                     clientinfo = #{
+                         protocol := Protocol,
+                         clientid := ClientId,
+                         username := Username,
+                         peerhost := PeerHost,
+                         mountpoint := Mountpoint}}) ->
+    Msg = emqx_message:make(
+            ClientId, Qos,
+            Topic, Payload, #{},
+            #{proto_ver => ProtoVer,
+              protocol => Protocol,
+              username => Username,
+              peerhost => PeerHost}),
+    emqx_mountpoint:mount(Mountpoint, Msg).
+
 %%--------------------------------------------------------------------
 %% Sub/UnSub
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_lwm2m/src/emqx_lwm2m.app.src

@@ -1,6 +1,6 @@
 {application,emqx_lwm2m,
              [{description,"EMQ X LwM2M Gateway"},
-              {vsn, "4.3.4"}, % strict semver, bump manually!
+              {vsn, "4.3.5"}, % strict semver, bump manually!
               {modules,[]},
               {registered,[emqx_lwm2m_sup]},
               {applications,[kernel,stdlib,lwm2m_coap]},

+ 9 - 3
apps/emqx_lwm2m/src/emqx_lwm2m.appup.src

@@ -1,5 +1,5 @@
 %% -*-: erlang -*-
-{"4.3.4",
+{VSN,
   [
     {<<"4\\.3\\.[0-1]">>, [
       {restart_application, emqx_lwm2m}
@@ -7,7 +7,10 @@
     {"4.3.2", [
       {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
     ]},
-    {"4.3.3", []} %% only config change
+    {"4.3.3", []}, %% only config change
+    {"4.3.4", [
+      {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
+    ]}
   ],
   [
     {<<"4\\.3\\.[0-1]">>, [
@@ -16,6 +19,9 @@
     {"4.3.2", [
       {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
     ]},
-    {"4.3.3", []} %% only config change
+    {"4.3.3", []}, %% only config change
+    {"4.3.4", [
+      {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
+    ]}
   ]
 }.

+ 46 - 19
apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl

@@ -74,7 +74,8 @@ call(Pid, Msg, Timeout) ->
         Error -> {error, Error}
     end.
 
-init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
+init(CoapPid, EndpointName, Peername = {_Peerhost, _Port},
+     RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
     Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""),
     Lwm2mState = #lwm2m_state{peername = Peername,
                               endpoint_name = EndpointName,
@@ -103,9 +104,10 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
                 emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
             end),
             emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
-	    emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
+            emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
 
-            {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
+            NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}),
+            {ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}};
         {error, Error} ->
             _ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined),
             {error, Error}
@@ -133,7 +135,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
             %% - report the registration info update, but only when objectList is updated.
             case NewRegInfo of
                 #{<<"objectList">> := _} ->
-		    emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
+                    emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
                     send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
                 _ -> ok
             end
@@ -186,7 +188,8 @@ deliver(#message{topic = Topic, payload = Payload},
                                   started_at = StartedAt,
                                   endpoint_name = EndpointName}) ->
     IsCacheMode = is_cache_mode(RegInfo, StartedAt),
-    ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
+    ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, "
+                "Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
     AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
     deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
     Lwm2mState.
@@ -235,8 +238,20 @@ unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) ->
     emqx_broker:unsubscribe(Topic),
     emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]).
 
-publish(Topic, Payload, Qos, EndpointName) ->
-    emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))).
+publish(Topic, Payload, Qos,
+        #lwm2m_state{
+           version = ProtoVer,
+           peername = {PeerHost, _},
+           endpoint_name = EndpointName}) ->
+    Message = emqx_message:set_flag(
+                retain, false,
+                emqx_message:make(EndpointName, Qos, Topic, Payload)
+               ),
+    NMessage = emqx_message:set_headers(
+                 #{proto_ver => ProtoVer,
+                   protocol => lwm2m,
+                   peerhost => PeerHost}, Message),
+    emqx_broker:publish(NMessage).
 
 time_now() -> erlang:system_time(millisecond).
 
@@ -244,7 +259,8 @@ time_now() -> erlang:system_time(millisecond).
 %% Deliver downlink message to coap
 %%--------------------------------------------------------------------
 
-deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
+deliver_to_coap(AlternatePath, JsonData,
+                CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
     try
         TermData = emqx_json:decode(JsonData, [return_maps]),
         deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
@@ -273,7 +289,8 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when
 send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
     do_send_to_broker(EventType, Payload, Lwm2mState).
 
-do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
+do_send_to_broker(EventType, #{<<"data">> := Data} = Payload,
+                  #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
     ReqPath = maps:get(<<"reqPath">>, Data, undefined),
     Code = maps:get(<<"code">>, Data, undefined),
     CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
@@ -281,7 +298,7 @@ do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpo
     emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
     NewPayload = maps:put(<<"msgType">>, EventType, Payload),
     Topic = uplink_topic(EventType, Lwm2mState),
-    publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
+    publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState).
 
 %%--------------------------------------------------------------------
 %% Auto Observe
@@ -315,18 +332,27 @@ auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
 
 observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
     lists:foreach(fun(ObjectPath) ->
-        [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
+        [ObjId | LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
         case ObjId of
             <<"19">> ->
                 [ObjInsId | _LastPath1] = LastPath,
                 case ObjInsId of
                     <<"0">> ->
-                        observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
+                        observe_object_slowly(
+                          AlternatePath, <<"/19/0/0">>,
+                          CoapPid, 100, EndpointName
+                         );
                     _ ->
-                        observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
+                        observe_object_slowly(
+                          AlternatePath, ObjectPath,
+                          CoapPid, 100, EndpointName
+                         )
                 end;
             _ ->
-                observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
+                observe_object_slowly(
+                  AlternatePath, ObjectPath,
+                  CoapPid, 100, EndpointName
+                 )
         end
     end, ObjectList).
 
@@ -380,11 +406,12 @@ get_cached_downlink_messages() ->
 is_cache_mode(RegInfo, StartedAt) ->
     case is_psm(RegInfo) orelse is_qmode(RegInfo) of
         true ->
-            QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22),
-            Now = time_now(),
-            if (Now - StartedAt) >= QModeTimeWind -> true;
-                true -> false
-            end;
+            QModeTimeWind = proplists:get_value(
+                              qmode_time_window,
+                              lwm2m_coap_responder:options(),
+                              22
+                             ),
+            (time_now() - StartedAt) >= QModeTimeWind;
         false -> false
     end.
 

+ 1 - 1
apps/emqx_stomp/src/emqx_stomp.app.src

@@ -1,6 +1,6 @@
 {application, emqx_stomp,
  [{description, "EMQ X Stomp Protocol Plugin"},
-  {vsn, "4.3.2"}, % strict semver, bump manually!
+  {vsn, "4.3.3"}, % strict semver, bump manually!
   {modules, []},
   {registered, [emqx_stomp_sup]},
   {applications, [kernel,stdlib]},

+ 8 - 2
apps/emqx_stomp/src/emqx_stomp.appup.src

@@ -1,11 +1,17 @@
 %% -*- mode: erlang -*-
 {VSN,
-  [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
+  [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
+   {"4.3.1",[
+     {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
+     {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
    {"4.3.0",
     [{restart_application,emqx_stomp},
      {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]},
    {<<".*">>,[]}],
-  [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
+  [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
+   {"4.3.1",[
+     {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
+     {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
    {"4.3.0",
     [{restart_application,emqx_stomp}]},
    {<<".*">>,[]}]}.

+ 48 - 20
apps/emqx_stomp/src/emqx_stomp_protocol.erl

@@ -108,6 +108,8 @@
                             , init/2
                             ]}).
 
+-elvis([{elvis_style, dont_repeat_yourself, disable}]).
+
 -type(pstate() :: #pstate{}).
 
 %% @doc Init protocol
@@ -132,8 +134,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
 
     AllowAnonymous = get_value(allow_anonymous, Opts, false),
     DefaultUser = get_value(default_user, Opts),
-
-	#pstate{
+    #pstate{
        conninfo = NConnInfo,
        clientinfo = ClientInfo,
        heartfun = HeartFun,
@@ -165,7 +166,7 @@ default_conninfo(ConnInfo) ->
 info(State) ->
     maps:from_list(info(?INFO_KEYS, State)).
 
--spec info(list(atom())|atom(), pstate()) -> term().
+-spec info(list(atom()) | atom(), pstate()) -> term().
 info(Keys, State) when is_list(Keys) ->
     [{Key, info(Key, State)} || Key <- Keys];
 info(conninfo, #pstate{conninfo = ConnInfo}) ->
@@ -288,7 +289,12 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true
 received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
     case header(<<"transaction">>, Headers) of
         undefined     -> {ok, handle_recv_send_frame(Frame, State)};
-        TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State)
+        TransactionId ->
+            add_action(TransactionId,
+                       {fun ?MODULE:handle_recv_send_frame/2, [Frame]},
+                       receipt_id(Headers),
+                       State
+                      )
     end;
 
 received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
@@ -346,7 +352,11 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
 received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
     case header(<<"transaction">>, Headers) of
         undefined     -> {ok, handle_recv_ack_frame(Frame, State)};
-        TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State)
+        TransactionId ->
+            add_action(TransactionId,
+                       {fun ?MODULE:handle_recv_ack_frame/2, [Frame]},
+                       receipt_id(Headers),
+                       State)
     end;
 
 %% NACK
@@ -357,7 +367,11 @@ received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
 received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
     case header(<<"transaction">>, Headers) of
         undefined     -> {ok, handle_recv_nack_frame(Frame, State)};
-        TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State)
+        TransactionId ->
+            add_action(TransactionId,
+                       {fun ?MODULE:handle_recv_nack_frame/2, [Frame]},
+                       receipt_id(Headers),
+                       State)
     end;
 
 %% BEGIN
@@ -516,9 +530,9 @@ negotiate_version(Accepts) ->
 
 negotiate_version(Ver, []) ->
     {error, <<"Supported protocol versions < ", Ver/binary>>};
-negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer ->
+negotiate_version(Ver, [AcceptVer | _]) when Ver >= AcceptVer ->
     {ok, AcceptVer};
-negotiate_version(Ver, [_|T]) ->
+negotiate_version(Ver, [_ | T]) ->
     negotiate_version(Ver, T).
 
 check_login(Login, _, AllowAnonymous, _)
@@ -537,7 +551,7 @@ check_login(Login, Passcode, _, DefaultUser) ->
 add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) ->
     case maps:get(Id, Trans, undefined) of
         {Ts, Actions} ->
-            NTrans = Trans#{Id => {Ts, [Action|Actions]}},
+            NTrans = Trans#{Id => {Ts, [Action | Actions]}},
             {ok, State#pstate{transaction = NTrans}};
         _ ->
             send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State)
@@ -588,15 +602,29 @@ next_ackid() ->
     put(ackid, AckId + 1),
     AckId.
 
-make_mqtt_message(Topic, Headers, Body) ->
-    Msg = emqx_message:make(stomp, Topic, Body),
-    Headers1 = lists:foldl(fun(Key, Headers0) ->
-                               proplists:delete(Key, Headers0)
-                           end, Headers, [<<"destination">>,
-                                          <<"content-length">>,
-                                          <<"content-type">>,
-                                          <<"transaction">>,
-                                          <<"receipt">>]),
+make_mqtt_message(Topic, Headers, Body,
+                  #pstate{
+                     conninfo = #{proto_ver := ProtoVer},
+                     clientinfo = #{
+                         protocol := Protocol,
+                         clientid := ClientId,
+                         username := Username,
+                         peerhost := PeerHost}}) ->
+    Msg = emqx_message:make(
+            ClientId, ?QOS_0,
+            Topic, Body, #{},
+            #{proto_ver => ProtoVer,
+              protocol => Protocol,
+              username => Username,
+              peerhost => PeerHost}),
+    Headers1 = lists:foldl(
+                 fun(Key, Headers0) ->
+                    proplists:delete(Key, Headers0)
+                 end, Headers, [<<"destination">>,
+                                <<"content-length">>,
+                                <<"content-type">>,
+                                <<"transaction">>,
+                                <<"receipt">>]),
     emqx_message:set_headers(#{stomp_headers => Headers1}, Msg).
 
 receipt_id(Headers) ->
@@ -611,7 +639,7 @@ handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, bod
         allow ->
             _ = maybe_send_receipt(receipt_id(Headers), State),
             _ = emqx_broker:publish(
-                make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
+                make_mqtt_message(Topic, Headers, iolist_to_binary(Body), State)
             ),
             State;
         deny ->
@@ -699,7 +727,7 @@ find_sub_by_id(Id, Subs) ->
             end, Subs),
     case maps:to_list(Found) of
         [] -> undefined;
-        [Sub|_] -> Sub
+        [Sub | _] -> Sub
     end.
 
 is_acl_enabled(_) ->