Przeglądaj źródła

Merge pull request #7343 from HJianBo/fix-gw-bugs

JianBo He 3 lat temu
rodzic
commit
83e16a5bc5

+ 47 - 11
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -119,10 +119,10 @@ stats(_) ->
     [].
 
 -spec init(map(), map()) -> channel().
-init(ConnInfoT = #{peername := {PeerHost, _},
-                   sockname := {_, SockPort}},
+init(ConnInfo = #{peername := {PeerHost, _},
+                  sockname := {_, SockPort}},
      #{ctx := Ctx} = Config) ->
-    Peercert = maps:get(peercert, ConnInfoT, undefined),
+    Peercert = maps:get(peercert, ConnInfo, undefined),
     Mountpoint = maps:get(mountpoint, Config, <<>>),
     ListenerId = case maps:get(listener, Config, undefined) of
                      undefined -> undefined;
@@ -144,10 +144,6 @@ init(ConnInfoT = #{peername := {PeerHost, _},
                     }
                   ),
 
-    %% because it is possible to disconnect after init, and then trigger the
-    %% $event.disconnected hook and these two fields are required in the hook
-    ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>},
-
     Heartbeat = ?GET_IDLE_TIME(Config),
     #channel{ ctx = Ctx
             , conninfo = ConnInfo
@@ -405,6 +401,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
             {ok, Input, Channel}
     end.
 
+enrich_conninfo({Queries, _Msg},
+                Channel = #channel{
+                             keepalive = KeepAlive,
+                             conninfo = ConnInfo}) ->
+    case Queries of
+        #{<<"clientid">> := ClientId} ->
+            Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
+            NConnInfo = ConnInfo#{ clientid => ClientId
+                                 , proto_name => <<"CoAP">>
+                                 , proto_ver => <<"1">>
+                                 , clean_start => true
+                                 , keepalive => Interval
+                                 , expiry_interval => 0
+                                 },
+            {ok, Channel#channel{conninfo = NConnInfo}};
+        _ ->
+            {error, "invalid queries", Channel}
+    end.
+
 enrich_clientinfo({Queries, Msg},
                   Channel = #channel{clientinfo = ClientInfo0}) ->
     case Queries of
@@ -580,10 +595,12 @@ process_out(Outs, Result, Channel, _) ->
 process_nothing(_, _, Channel) ->
     {ok, Channel}.
 
-process_connection({open, Req}, Result, Channel, Iter) ->
+process_connection({open, Req}, Result,
+                   Channel = #channel{conn_state = idle}, Iter) ->
     Queries = emqx_coap_message:get_option(uri_query, Req),
     case emqx_misc:pipeline(
-           [ fun run_conn_hooks/2
+           [ fun enrich_conninfo/2
+           , fun run_conn_hooks/2
            , fun enrich_clientinfo/2
            , fun set_log_meta/2
            , fun auth_connect/2
@@ -594,12 +611,31 @@ process_connection({open, Req}, Result, Channel, Iter) ->
             process_connect(ensure_connected(NChannel), Req, Result, Iter);
         {error, ReasonCode, NChannel} ->
             ErrMsg = io_lib:format("Login Failed: ~ts", [ReasonCode]),
-            Payload = erlang:list_to_binary(lists:flatten(ErrMsg)),
+            Payload = iolist_to_binary(ErrMsg),
             iter(Iter,
                  reply({error, bad_request}, Payload, Req, Result),
                  NChannel)
     end;
-
+process_connection({open, Req}, Result,
+                   Channel = #channel{
+                                conn_state = ConnState,
+                                clientinfo = #{clientid := ClientId}}, Iter)
+  when ConnState == connected ->
+    Queries = emqx_coap_message:get_option(uri_query, Req),
+    ErrMsg0 =
+        case Queries of
+            #{<<"clientid">> := ClientId} ->
+                "client has connected";
+            #{<<"clientid">> := ReqClientId} ->
+                ["channel has registered by: ", ReqClientId];
+            _ ->
+                "invalid queries"
+        end,
+    ErrMsg = io_lib:format("Bad Request: ~ts", [ErrMsg0]),
+    Payload = iolist_to_binary(ErrMsg),
+    iter(Iter,
+         reply({error, bad_request}, Payload, Req, Result),
+         Channel);
 process_connection({close, Msg}, _, Channel, _) ->
     Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
     {shutdown, close, Reply, Channel}.

+ 1 - 1
apps/emqx_gateway/src/coap/emqx_coap_impl.erl

@@ -83,7 +83,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
             logger:error("Failed to update ~ts; "
                          "reason: {~0p, ~0p} stacktrace: ~0p",
                          [GwName, Class, Reason, Stk]),
-            {error, {Class, Reason}}
+            {error, Reason}
     end.
 
 on_gateway_unload(_Gateway = #{ name := GwName,

+ 2 - 2
apps/emqx_gateway/src/emqx_gateway_api.erl

@@ -97,7 +97,7 @@ gateway_insta(delete, #{bindings := #{name := Name0}}) ->
             ok ->
                 {204};
             {error, Reason} ->
-                return_http_error(400, Reason)
+                emqx_gateway_http:reason2resp(Reason)
         end
     end);
 gateway_insta(get, #{bindings := #{name := Name0}}) ->
@@ -134,7 +134,7 @@ gateway_insta(put, #{body := GwConf0,
             {ok, Gateway} ->
                 {200, Gateway};
             {error, Reason} ->
-                return_http_error(500, Reason)
+                emqx_gateway_http:reason2resp(Reason)
         end
     end).
 

+ 6 - 1
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -311,7 +311,12 @@ return_http_error(Code, Msg) ->
 
 -spec reason2msg({atom(), map()} | any()) -> error | string().
 reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
-    fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]);
+    NValue = case emqx_json:safe_encode(Value) of
+                 {ok, Str} -> Str;
+                 {error, _} -> emqx_gateway_utils:stringfy(Value)
+             end,
+    fmtstr("Bad config value '~s' for '~s', reason: ~s",
+           [NValue, Key, Reason]);
 reason2msg({badres, #{resource := gateway,
                       gateway := GwName,
                       reason := not_found}}) ->

+ 18 - 12
apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl

@@ -57,18 +57,19 @@ on_gateway_load(_Gateway = #{ name := GwName,
                               config := Config
                             }, Ctx) ->
     %% XXX: How to monitor it ?
-    %% Start grpc client pool & client channel
-    PoolName = pool_name(GwName),
-    PoolSize = emqx_vm:schedulers() * 2,
-    {ok, PoolSup} = emqx_pool_sup:start_link(
-                      PoolName, hash, PoolSize,
-                      {emqx_exproto_gcli, start_link, []}),
     _ = start_grpc_client_channel(GwName,
                                   maps:get(handler, Config, undefined)
                                  ),
     %% XXX: How to monitor it ?
     _ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
 
+    %% XXX: How to monitor it ?
+    PoolName = pool_name(GwName),
+    PoolSize = emqx_vm:schedulers() * 2,
+    {ok, PoolSup} = emqx_pool_sup:start_link(
+                      PoolName, hash, PoolSize,
+                      {emqx_exproto_gcli, start_link, []}),
+
     NConfig = maps:without(
                  [server, handler],
                  Config#{pool_name => PoolName}
@@ -103,7 +104,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
             logger:error("Failed to update ~ts; "
                          "reason: {~0p, ~0p} stacktrace: ~0p",
                          [GwName, Class, Reason, Stk]),
-            {error, {Class, Reason}}
+            {error, Reason}
     end.
 
 on_gateway_unload(_Gateway = #{ name := GwName,
@@ -141,8 +142,11 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
             console_print("Start ~ts gRPC server on ~p successfully.~n",
                           [GwName, ListenOn]);
         {error, Reason} ->
-            ?ELOG("Failed to start ~ts gRPC server on ~p, reason: ~p",
-                  [GwName, ListenOn, Reason])
+            ?ELOG("Failed to start ~ts gRPC server on ~p, reason: ~0p",
+                  [GwName, ListenOn, Reason]),
+            throw({badconf, #{key => server,
+                              value => Options,
+                              reason => illegal_grpc_server_confs}})
     end.
 
 stop_grpc_server(GwName) ->
@@ -162,14 +166,16 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) ->
                                }})
 
         end,
-    case maps:to_list(maps:get(ssl, Options, #{})) of
-        [] ->
+    case emqx_map_lib:deep_get([ssl, enable], Options, false) of
+        false ->
             SvrAddr = compose_http_uri(http, Host, Port),
             grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{});
-        SslOpts ->
+        true ->
+            SslOpts = maps:to_list(maps:get(ssl, Options, #{})),
             ClientOpts = #{gun_opts =>
                            #{transport => ssl,
                              transport_opts => SslOpts}},
+
             SvrAddr = compose_http_uri(https, Host, Port),
             grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts)
     end.

+ 23 - 5
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl

@@ -108,10 +108,10 @@ info(ctx, #channel{ctx = Ctx}) ->
 stats(_) ->
     [].
 
-init(ConnInfoT = #{peername := {PeerHost, _},
-                   sockname := {_, SockPort}},
+init(ConnInfo = #{peername := {PeerHost, _},
+                  sockname := {_, SockPort}},
      #{ctx := Ctx} = Config) ->
-    Peercert = maps:get(peercert, ConnInfoT, undefined),
+    Peercert = maps:get(peercert, ConnInfo, undefined),
     Mountpoint = maps:get(mountpoint, Config, undefined),
     ListenerId = case maps:get(listener, Config, undefined) of
                      undefined -> undefined;
@@ -133,8 +133,6 @@ init(ConnInfoT = #{peername := {PeerHost, _},
                     }
                   ),
 
-    ConnInfo = ConnInfoT#{proto_name => <<"LwM2M">>, proto_ver => <<"0.0">>},
-
     #channel{ ctx = Ctx
             , conninfo = ConnInfo
             , clientinfo = ClientInfo
@@ -369,6 +367,7 @@ do_takeover(_DesireId, Msg, Channel) ->
 do_connect(Req, Result, Channel, Iter) ->
     case emqx_misc:pipeline(
            [ fun check_lwm2m_version/2
+           , fun enrich_conninfo/2
            , fun run_conn_hooks/2
            , fun enrich_clientinfo/2
            , fun set_log_meta/2
@@ -427,6 +426,25 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
             {ok, Input, Channel}
     end.
 
+enrich_conninfo(#coap_message{options = Options},
+                Channel = #channel{
+                             conninfo = ConnInfo}) ->
+    Query = maps:get(uri_query, Options, #{}),
+    case Query of
+        #{<<"ep">> := Epn, <<"lt">> := Lifetime} ->
+            ClientId = maps:get(<<"device_id">>, Query, Epn),
+            NConnInfo = ConnInfo#{ clientid => ClientId
+                                 , proto_name => <<"LwM2M">>
+                                 , proto_ver => <<"1.0.1">>
+                                 , clean_start => true
+                                 , keepalive => binary_to_integer(Lifetime)
+                                 , expiry_interval => 0
+                                 },
+            {ok, Channel#channel{conninfo = NConnInfo}};
+        _ ->
+            {error, "invalid queries", Channel}
+    end.
+
 enrich_clientinfo(#coap_message{options = Options} = Msg,
                   Channel = #channel{clientinfo = ClientInfo0}) ->
     Query = maps:get(uri_query, Options, #{}),

+ 1 - 1
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl

@@ -87,7 +87,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
             logger:error("Failed to update ~ts; "
                          "reason: {~0p, ~0p} stacktrace: ~0p",
                          [GwName, Class, Reason, Stk]),
-            {error, {Class, Reason}}
+            {error, Reason}
     end.
 
 on_gateway_unload(_Gateway = #{ name := GwName,

+ 1 - 1
apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl

@@ -106,7 +106,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
             logger:error("Failed to update ~ts; "
                          "reason: {~0p, ~0p} stacktrace: ~0p",
                          [GwName, Class, Reason, Stk]),
-            {error, {Class, Reason}}
+            {error, Reason}
     end.
 
 on_gateway_unload(_Gateway = #{ name := GwName,

+ 9 - 14
apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl

@@ -512,8 +512,15 @@ handle_in(?PACKET(?CMD_ABORT, Headers),
         handle_out(receipt, receipt_id(Headers), NChannel)
     end);
 
-handle_in(?PACKET(?CMD_DISCONNECT, Headers), Channel) ->
-    shutdown_with_recepit(normal, receipt_id(Headers), Channel);
+handle_in(?PACKET(?CMD_DISCONNECT, Headers),
+          Channel = #channel{conn_state = connected}) ->
+    Outgoings = case receipt_id(Headers) of
+                    undefined -> [{close, normal}];
+                    ReceiptId ->
+                        [{outgoing, receipt_frame(ReceiptId)},
+                         {close, normal}]
+                end,
+    {ok, Outgoings, Channel};
 
 handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
     ?SLOG(error, #{ msg => "unexpected_frame_error"
@@ -765,7 +772,6 @@ handle_info({sock_closed, Reason},
     %emqx_zone:enable_flapping_detect(Zone)
     %    andalso emqx_flapping:detect(ClientInfo),
     NChannel = ensure_disconnected(Reason, Channel),
-    %% XXX: Session keepper detect here
     shutdown(Reason, NChannel);
 
 handle_info({sock_closed, Reason},
@@ -918,20 +924,9 @@ reply(Reply, Channel) ->
 shutdown(Reason, Channel) ->
     {shutdown, Reason, Channel}.
 
-shutdown_with_recepit(Reason, ReceiptId, Channel) ->
-    case ReceiptId of
-        undefined ->
-            {shutdown, Reason, Channel};
-        _ ->
-            {shutdown, Reason, receipt_frame(ReceiptId), Channel}
-    end.
-
 shutdown(Reason, AckFrame, Channel) ->
     {shutdown, Reason, AckFrame, Channel}.
 
-%shutdown_and_reply(Reason, Reply, Channel) ->
-%    {shutdown, Reason, Reply, Channel}.
-
 shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
     {shutdown, Reason, Reply, OutPkt, Channel}.
 

+ 2 - 0
apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl

@@ -158,6 +158,8 @@ parse(<<?BSL, Ch:8, Rest/binary>>,
                                               Phase =:= hdvalue ->
     parse(Phase, Rest, acc(unescape(Ch), State));
 
+parse(<<?LF>>, Parser = #{phase := none}) ->
+    {more,  Parser};
 parse(Bytes, #{phase := none, state := State}) ->
     parse(command, Bytes, State).
 

+ 1 - 1
apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl

@@ -87,7 +87,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
             logger:error("Failed to update ~ts; "
                          "reason: {~0p, ~0p} stacktrace: ~0p",
                          [GwName, Class, Reason, Stk]),
-            {error, {Class, Reason}}
+            {error, Reason}
     end.
 
 on_gateway_unload(_Gateway = #{ name := GwName,

+ 11 - 8
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -55,6 +55,8 @@
         ]).
 -endif.
 
+-elvis([{elvis_style, dont_repeat_yourself, disable}]).
+
 event_names() ->
     [ 'client.connected'
     , 'client.disconnected'
@@ -185,18 +187,18 @@ eventmsg_connected(_ClientInfo = #{
                     is_bridge := IsBridge,
                     mountpoint := Mountpoint
                    },
-                   _ConnInfo = #{
+                   ConnInfo = #{
                     peername := PeerName,
                     sockname := SockName,
                     clean_start := CleanStart,
                     proto_name := ProtoName,
                     proto_ver := ProtoVer,
-                    keepalive := Keepalive,
-                    connected_at := ConnectedAt,
-                    conn_props := ConnProps,
-                    receive_maximum := RcvMax,
-                    expiry_interval := ExpiryInterval
+                    connected_at := ConnectedAt
                    }) ->
+    Keepalive = maps:get(keepalive, ConnInfo, 0),
+    ConnProps = maps:get(conn_props, ConnInfo, #{}),
+    RcvMax = maps:get(receive_maximum, ConnInfo, 0),
+    ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0),
     with_basic_columns('client.connected',
         #{clientid => ClientId,
           username => Username,
@@ -407,7 +409,8 @@ event_info_message_dropped() ->
     event_info_common(
         'message.dropped',
         {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>},
-        {<<"messages are discarded during routing, usually because there are no subscribers">>, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
+        {<<"messages are discarded during routing, usually because there are no subscribers">>,
+         <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
         <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
     ).
 event_info_delivery_dropped() ->
@@ -415,7 +418,7 @@ event_info_delivery_dropped() ->
         'delivery.dropped',
         {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>},
         {<<"messages are discarded during delivery, i.e. because the message queue is full">>,
-        <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>},
+         <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>},
         <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">>
     ).
 event_info_client_connected() ->