Просмотр исходного кода

fix(gw): enrich conninfo for coap&lwm2m

JianBo He 4 лет назад
Родитель
Сommit
5682dcb72e

+ 24 - 8
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -119,10 +119,10 @@ stats(_) ->
     [].
 
 -spec init(map(), map()) -> channel().
-init(ConnInfoT = #{peername := {PeerHost, _},
-                   sockname := {_, SockPort}},
+init(ConnInfo = #{peername := {PeerHost, _},
+                  sockname := {_, SockPort}},
      #{ctx := Ctx} = Config) ->
-    Peercert = maps:get(peercert, ConnInfoT, undefined),
+    Peercert = maps:get(peercert, ConnInfo, undefined),
     Mountpoint = maps:get(mountpoint, Config, <<>>),
     ListenerId = case maps:get(listener, Config, undefined) of
                      undefined -> undefined;
@@ -144,10 +144,6 @@ init(ConnInfoT = #{peername := {PeerHost, _},
                     }
                   ),
 
-    %% because it is possible to disconnect after init, and then trigger the
-    %% $event.disconnected hook and these two fields are required in the hook
-    ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>},
-
     Heartbeat = ?GET_IDLE_TIME(Config),
     #channel{ ctx = Ctx
             , conninfo = ConnInfo
@@ -405,6 +401,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
             {ok, Input, Channel}
     end.
 
+enrich_conninfo({Queries, _Msg},
+                Channel = #channel{
+                             keepalive = KeepAlive,
+                             conninfo = ConnInfo}) ->
+    case Queries of
+        #{<<"clientid">> := ClientId} ->
+            Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
+            NConnInfo = ConnInfo#{ clientid => ClientId
+                                 , proto_name => <<"CoAP">>
+                                 , proto_ver => <<"1">>
+                                 , clean_start => true
+                                 , keepalive => Interval
+                                 , expiry_interval => 0
+                                 },
+            {ok, Channel#channel{conninfo = NConnInfo}};
+        _ ->
+            {error, "invalid queries", Channel}
+    end.
+
 enrich_clientinfo({Queries, Msg},
                   Channel = #channel{clientinfo = ClientInfo0}) ->
     case Queries of
@@ -583,7 +598,8 @@ process_nothing(_, _, Channel) ->
 process_connection({open, Req}, Result, Channel, Iter) ->
     Queries = emqx_coap_message:get_option(uri_query, Req),
     case emqx_misc:pipeline(
-           [ fun run_conn_hooks/2
+           [ fun enrich_conninfo/2
+           , fun run_conn_hooks/2
            , fun enrich_clientinfo/2
            , fun set_log_meta/2
            , fun auth_connect/2

+ 23 - 5
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl

@@ -108,10 +108,10 @@ info(ctx, #channel{ctx = Ctx}) ->
 stats(_) ->
     [].
 
-init(ConnInfoT = #{peername := {PeerHost, _},
-                   sockname := {_, SockPort}},
+init(ConnInfo = #{peername := {PeerHost, _},
+                  sockname := {_, SockPort}},
      #{ctx := Ctx} = Config) ->
-    Peercert = maps:get(peercert, ConnInfoT, undefined),
+    Peercert = maps:get(peercert, ConnInfo, undefined),
     Mountpoint = maps:get(mountpoint, Config, undefined),
     ListenerId = case maps:get(listener, Config, undefined) of
                      undefined -> undefined;
@@ -133,8 +133,6 @@ init(ConnInfoT = #{peername := {PeerHost, _},
                     }
                   ),
 
-    ConnInfo = ConnInfoT#{proto_name => <<"LwM2M">>, proto_ver => <<"0.0">>},
-
     #channel{ ctx = Ctx
             , conninfo = ConnInfo
             , clientinfo = ClientInfo
@@ -369,6 +367,7 @@ do_takeover(_DesireId, Msg, Channel) ->
 do_connect(Req, Result, Channel, Iter) ->
     case emqx_misc:pipeline(
            [ fun check_lwm2m_version/2
+           , fun enrich_conninfo/2
            , fun run_conn_hooks/2
            , fun enrich_clientinfo/2
            , fun set_log_meta/2
@@ -427,6 +426,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
             {ok, Input, Channel}
     end.
 
+enrich_conninfo(#coap_message{options = Options},
+                Channel = #channel{
+                             conninfo = ConnInfo}) ->
+    Query = maps:get(uri_query, Options, #{}),
+    case Query of
+        #{<<"ep">> := Epn, <<"lt">> := Lifetime} ->
+            ClientId = maps:get(<<"device_id">>, Query, Epn),
+            NConnInfo = ConnInfo#{ clientid => ClientId
+                                 , proto_name => <<"LwM2M">>
+                                 , proto_ver => <<"1.0.1">>
+                                 , clean_start => true
+                                 , keepalive => binary_to_integer(Lifetime)
+                                 , expiry_interval => 0
+                                 },
+            {ok, Channel#channel{conninfo = NConnInfo}};
+        _ ->
+            {error, "invalid queries", Channel}
+    end.
+
 enrich_clientinfo(#coap_message{options = Options} = Msg,
                   Channel = #channel{clientinfo = ClientInfo0}) ->
     Query = maps:get(uri_query, Options, #{}),

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -185,7 +185,7 @@ eventmsg_connected(_ClientInfo = #{
                     is_bridge := IsBridge,
                     mountpoint := Mountpoint
                    },
-                   _ConnInfo = #{
+                   ConnInfo = #{
                     peername := PeerName,
                     sockname := SockName,
                     clean_start := CleanStart,