Просмотр исходного кода

Merge pull request #12892 from HJianBo/fix-gateway-related-issues

fix(ocpp): avoid an error log in handling downstream messages
JianBo He 1 год назад
Родитель
Сommit
d85df14b85

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway, [
     {description, "The Gateway management application"},
-    {vsn, "0.1.31"},
+    {vsn, "0.1.32"},
     {registered, []},
     {mod, {emqx_gateway_app, []}},
     {applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]},

+ 125 - 22
apps/emqx_gateway/src/emqx_gateway_api_listeners.erl

@@ -247,9 +247,10 @@ page_params(Qs) ->
 get_cluster_listeners_info(GwName) ->
     Listeners = emqx_gateway_conf:listeners(GwName),
     ListenOns = lists:map(
-        fun(#{id := Id} = Conf) ->
+        fun(#{id := Id, type := Type0} = Conf) ->
+            Type = binary_to_existing_atom(Type0),
             ListenOn = emqx_gateway_conf:get_bind(Conf),
-            {Id, ListenOn}
+            {Type, Id, ListenOn}
         end,
         Listeners
     ),
@@ -293,17 +294,11 @@ listeners_cluster_status(Listeners) ->
 do_listeners_cluster_status(Listeners) ->
     Node = node(),
     lists:foldl(
-        fun({Id, ListenOn}, Acc) ->
-            BinId = erlang:atom_to_binary(Id),
-            {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
-            {Running, Curr} =
-                try esockd:get_current_connections({Id, ListenOn}) of
-                    Int -> {true, Int}
-                catch
-                    %% not started
-                    error:not_found ->
-                        {false, 0}
-                end,
+        fun({Type, Id, ListenOn}, Acc) ->
+            {Running, Curr} = current_listener_status(Type, Id, ListenOn),
+            {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(
+                erlang:atom_to_binary(Id)
+            ),
             Acc#{
                 Id => #{
                     node => Node,
@@ -319,6 +314,24 @@ do_listeners_cluster_status(Listeners) ->
         Listeners
     ).
 
+current_listener_status(Type, Id, _ListenOn) when Type =:= ws; Type =:= wss ->
+    Info = ranch:info(Id),
+    Conns = proplists:get_value(all_connections, Info, 0),
+    Running =
+        case proplists:get_value(status, Info) of
+            running -> true;
+            _ -> false
+        end,
+    {Running, Conns};
+current_listener_status(_Type, Id, ListenOn) ->
+    try esockd:get_current_connections({Id, ListenOn}) of
+        Int -> {true, Int}
+    catch
+        %% not started
+        error:not_found ->
+            {false, 0}
+    end.
+
 ensure_integer_or_infinity(infinity) ->
     infinity;
 ensure_integer_or_infinity(<<"infinity">>) ->
@@ -762,9 +775,9 @@ examples_listener() ->
                                     <<"tlsv1.1">>,
                                     <<"tlsv1">>
                                 ],
-                                cacertfile => <<"/etc/emqx/certs/cacert.pem">>,
-                                certfile => <<"/etc/emqx/certs/cert.pem">>,
-                                keyfile => <<"/etc/emqx/certs/key.pem">>,
+                                cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>,
+                                certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>,
+                                keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>,
                                 verify => <<"verify_none">>,
                                 fail_if_no_peer_cert => false
                             },
@@ -808,9 +821,9 @@ examples_listener() ->
                         dtls_options =>
                             #{
                                 versions => [<<"dtlsv1.2">>, <<"dtlsv1">>],
-                                cacertfile => <<"/etc/emqx/certs/cacert.pem">>,
-                                certfile => <<"/etc/emqx/certs/cert.pem">>,
-                                keyfile => <<"/etc/emqx/certs/key.pem">>,
+                                cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>,
+                                certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>,
+                                keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>,
                                 verify => <<"verify_none">>,
                                 fail_if_no_peer_cert => false
                             },
@@ -835,9 +848,9 @@ examples_listener() ->
                         dtls_options =>
                             #{
                                 versions => [<<"dtlsv1.2">>, <<"dtlsv1">>],
-                                cacertfile => <<"/etc/emqx/certs/cacert.pem">>,
-                                certfile => <<"/etc/emqx/certs/cert.pem">>,
-                                keyfile => <<"/etc/emqx/certs/key.pem">>,
+                                cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>,
+                                certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>,
+                                keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>,
                                 verify => <<"verify_none">>,
                                 user_lookup_fun => <<"emqx_tls_psk:lookup">>,
                                 ciphers =>
@@ -869,5 +882,95 @@ examples_listener() ->
                                 user_id_type => <<"username">>
                             }
                     }
+            },
+        ws_listener =>
+            #{
+                summary => <<"A simple WebSocket listener example">>,
+                value =>
+                    #{
+                        name => <<"ws-def">>,
+                        type => <<"ws">>,
+                        bind => <<"33043">>,
+                        acceptors => 16,
+                        max_connections => 1024000,
+                        max_conn_rate => 1000,
+                        websocket =>
+                            #{
+                                path => <<"/ocpp">>,
+                                fail_if_no_subprotocol => true,
+                                supported_subprotocols => <<"ocpp1.6">>,
+                                check_origin_enable => false,
+                                check_origins =>
+                                    <<"http://localhost:18083, http://127.0.0.1:18083">>,
+                                compress => false,
+                                piggyback => <<"single">>
+                            },
+                        tcp_options =>
+                            #{
+                                active_n => 100,
+                                backlog => 1024,
+                                send_timeout => <<"15s">>,
+                                send_timeout_close => true,
+                                recbuf => <<"10KB">>,
+                                sndbuf => <<"10KB">>,
+                                buffer => <<"10KB">>,
+                                high_watermark => <<"1MB">>,
+                                nodelay => false,
+                                reuseaddr => true,
+                                keepalive => "none"
+                            }
+                    }
+            },
+        wss_listener =>
+            #{
+                summary => <<"A simple WebSocket/TLS listener example">>,
+                value =>
+                    #{
+                        name => <<"ws-ssl-def">>,
+                        type => <<"wss">>,
+                        bind => <<"33053">>,
+                        acceptors => 16,
+                        max_connections => 1024000,
+                        max_conn_rate => 1000,
+                        websocket =>
+                            #{
+                                path => <<"/ocpp">>,
+                                fail_if_no_subprotocol => true,
+                                supported_subprotocols => <<"ocpp1.6">>,
+                                check_origin_enable => false,
+                                check_origins =>
+                                    <<"http://localhost:18083, http://127.0.0.1:18083">>,
+                                compress => false,
+                                piggyback => <<"single">>
+                            },
+                        ssl_options =>
+                            #{
+                                versions => [
+                                    <<"tlsv1.3">>,
+                                    <<"tlsv1.2">>,
+                                    <<"tlsv1.1">>,
+                                    <<"tlsv1">>
+                                ],
+                                cacertfile => <<"${EMQX_ETC_DIR}/certs/cacert.pem">>,
+                                certfile => <<"${EMQX_ETC_DIR}/certs/cert.pem">>,
+                                keyfile => <<"${EMQX_ETC_DIR}/certs/key.pem">>,
+                                verify => <<"verify_none">>,
+                                fail_if_no_peer_cert => false
+                            },
+                        tcp_options =>
+                            #{
+                                active_n => 100,
+                                backlog => 1024,
+                                send_timeout => <<"15s">>,
+                                send_timeout_close => true,
+                                recbuf => <<"10KB">>,
+                                sndbuf => <<"10KB">>,
+                                buffer => <<"10KB">>,
+                                high_watermark => <<"1MB">>,
+                                nodelay => false,
+                                reuseaddr => true,
+                                keepalive => "none"
+                            }
+                    }
             }
     }.

+ 2 - 3
apps/emqx_gateway_ocpp/include/emqx_ocpp.hrl

@@ -86,10 +86,9 @@
 -define(IS_ERROR(F), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR}).
 -define(IS_ERROR(F, Id), F = #{type := ?OCPP_MSG_TYPE_ID_CALLERROR, id := Id}).
 
--define(IS_BootNotification_RESP(Payload), #{
+-define(IS_BootNotification_RESP(Status, Interval), #{
     type := ?OCPP_MSG_TYPE_ID_CALLRESULT,
-    action := ?OCPP_ACT_BootNotification,
-    payload := Payload
+    payload := #{<<"status">> := Status, <<"interval">> := Interval}
 }).
 
 -define(ERR_FRAME(Id, Code, Desc), #{

+ 1 - 1
apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_ocpp, [
     {description, "OCPP-J 1.6 Gateway for EMQX"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [kernel, stdlib, jesse, emqx, emqx_gateway]},
     {env, []},

+ 13 - 18
apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

@@ -527,20 +527,19 @@ apply_frame(Frames, Channel) when is_list(Frames) ->
     {Outgoings, NChannel} = lists:foldl(fun do_apply_frame/2, {[], Channel}, Frames),
     {lists:reverse(Outgoings), NChannel};
 apply_frame(Frames, Channel) ->
-    ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames, channel => Channel}),
+    ?SLOG(error, #{msg => "unexpected_frame_list", frames => Frames}),
     Channel.
 
-do_apply_frame(?IS_BootNotification_RESP(Payload), {Outgoings, Channel}) ->
-    case maps:get(<<"status">>, Payload) of
+do_apply_frame(?IS_BootNotification_RESP(Status, Interval), {Outgoings, Channel}) ->
+    case Status of
         <<"Accepted">> ->
-            Intv = maps:get(<<"interval">>, Payload),
-            ?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Intv}),
-            {[{event, updated} | Outgoings], reset_keepalive(Intv, Channel)};
+            ?SLOG(info, #{msg => "adjust_heartbeat_timer", new_interval_s => Interval}),
+            {[{event, updated} | Outgoings], reset_keepalive(Interval, Channel)};
         _ ->
             {Outgoings, Channel}
     end;
-do_apply_frame(Frame, Acc = {_Outgoings, Channel}) ->
-    ?SLOG(error, #{msg => "unexpected_frame", frame => Frame, channel => Channel}),
+do_apply_frame(Frame, Acc = {_Outgoings, _Channel}) ->
+    ?SLOG(info, #{msg => "skip_to_apply_frame", frame => Frame}),
     Acc.
 
 %%--------------------------------------------------------------------
@@ -762,19 +761,15 @@ payload2frame(#{
         action => Action,
         payload => Payload
     };
-payload2frame(
-    MqttPayload =
-        #{
-            <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT,
-            <<"UniqueId">> := Id,
-            <<"Payload">> := Payload
-        }
-) ->
-    Action = maps:get(<<"Action">>, MqttPayload, undefined),
+payload2frame(#{
+    <<"MessageTypeId">> := ?OCPP_MSG_TYPE_ID_CALLRESULT,
+    <<"UniqueId">> := Id,
+    <<"Payload">> := Payload
+}) ->
     #{
         type => ?OCPP_MSG_TYPE_ID_CALLRESULT,
         id => Id,
-        action => Action,
+        action => undefined,
         payload => Payload
     };
 payload2frame(#{

+ 134 - 0
apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl

@@ -16,6 +16,7 @@
 
 -module(emqx_ocpp_SUITE).
 
+-include("emqx_ocpp.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
@@ -145,3 +146,136 @@ t_enable_disable_gw_ocpp(_Config) ->
     AssertEnabled(false),
     ?assertEqual({204, #{}}, request(put, "/gateways/ocpp/enable/true", <<>>)),
     AssertEnabled(true).
+
+t_adjust_keepalive_timer(_Config) ->
+    {ok, ClientPid} = connect("127.0.0.1", 33033, <<"client1">>),
+    UniqueId = <<"3335862321">>,
+    BootNotification = #{
+        id => UniqueId,
+        type => ?OCPP_MSG_TYPE_ID_CALL,
+        action => <<"BootNotification">>,
+        payload => #{
+            <<"chargePointVendor">> => <<"vendor1">>,
+            <<"chargePointModel">> => <<"model1">>
+        }
+    },
+    ok = send_msg(ClientPid, BootNotification),
+    %% check the default keepalive timer
+    timer:sleep(1000),
+    ?assertMatch(
+        #{conninfo := #{keepalive := 60}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)
+    ),
+    %% publish the BootNotification.ack
+    AckPayload = emqx_utils_json:encode(#{
+        <<"MessageTypeId">> => ?OCPP_MSG_TYPE_ID_CALLRESULT,
+        <<"UniqueId">> => UniqueId,
+        <<"Payload">> => #{
+            <<"currentTime">> => "2023-06-21T14:20:39+00:00",
+            <<"interval">> => 300,
+            <<"status">> => <<"Accepted">>
+        }
+    }),
+    _ = emqx:publish(emqx_message:make(<<"ocpp/cs/client1">>, AckPayload)),
+    {ok, _Resp} = receive_msg(ClientPid),
+    %% assert: check the keepalive timer is adjusted
+    ?assertMatch(
+        #{conninfo := #{keepalive := 300}}, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)
+    ),
+    %% close conns
+    close(ClientPid),
+    timer:sleep(1000),
+    %% assert:
+    ?assertEqual(undefined, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)),
+    ok.
+
+t_listeners_status(_Config) ->
+    {200, [Listener]} = request(get, "/gateways/ocpp/listeners"),
+    ?assertMatch(
+        #{
+            status := #{running := true, current_connections := 0}
+        },
+        Listener
+    ),
+    %% add a connection
+    {ok, ClientPid} = connect("127.0.0.1", 33033, <<"client1">>),
+    UniqueId = <<"3335862321">>,
+    BootNotification = #{
+        id => UniqueId,
+        type => ?OCPP_MSG_TYPE_ID_CALL,
+        action => <<"BootNotification">>,
+        payload => #{
+            <<"chargePointVendor">> => <<"vendor1">>,
+            <<"chargePointModel">> => <<"model1">>
+        }
+    },
+    ok = send_msg(ClientPid, BootNotification),
+    timer:sleep(1000),
+    %% assert: the current_connections is 1
+    {200, [Listener1]} = request(get, "/gateways/ocpp/listeners"),
+    ?assertMatch(
+        #{
+            status := #{running := true, current_connections := 1}
+        },
+        Listener1
+    ),
+    %% close conns
+    close(ClientPid),
+    timer:sleep(1000),
+    %% assert: the current_connections is 0
+    {200, [Listener2]} = request(get, "/gateways/ocpp/listeners"),
+    ?assertMatch(
+        #{
+            status := #{running := true, current_connections := 0}
+        },
+        Listener2
+    ).
+
+%%--------------------------------------------------------------------
+%% ocpp simple client
+
+connect(Host, Port, ClientId) ->
+    Timeout = 5000,
+    ConnOpts = #{connect_timeout => 5000},
+    case gun:open(Host, Port, ConnOpts) of
+        {ok, ConnPid} ->
+            {ok, _} = gun:await_up(ConnPid, Timeout),
+            case upgrade(ConnPid, ClientId, Timeout) of
+                {ok, _Headers} -> {ok, ConnPid};
+                Error -> Error
+            end;
+        Error ->
+            Error
+    end.
+
+upgrade(ConnPid, ClientId, Timeout) ->
+    Path = binary_to_list(<<"/ocpp/", ClientId/binary>>),
+    WsHeaders = [{<<"cache-control">>, <<"no-cache">>}],
+    StreamRef = gun:ws_upgrade(ConnPid, Path, WsHeaders, #{protocols => [{<<"ocpp1.6">>, gun_ws_h}]}),
+    receive
+        {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], Headers} ->
+            {ok, Headers};
+        {gun_response, ConnPid, _, _, Status, Headers} ->
+            {error, {ws_upgrade_failed, Status, Headers}};
+        {gun_error, ConnPid, StreamRef, Reason} ->
+            {error, {ws_upgrade_failed, Reason}}
+    after Timeout ->
+        {error, timeout}
+    end.
+
+send_msg(ConnPid, Frame) when is_map(Frame) ->
+    Opts = emqx_ocpp_frame:serialize_opts(),
+    Msg = emqx_ocpp_frame:serialize_pkt(Frame, Opts),
+    gun:ws_send(ConnPid, {text, Msg}).
+
+receive_msg(ConnPid) ->
+    receive
+        {gun_ws, ConnPid, _Ref, {_Type, Msg}} ->
+            ParseState = emqx_ocpp_frame:initial_parse_state(#{}),
+            {ok, Frame, _Rest, _NewParseStaet} = emqx_ocpp_frame:parse(Msg, ParseState),
+            {ok, Frame}
+    after 5000 ->
+        {error, timeout}
+    end.
+
+close(ConnPid) ->
+    gun:shutdown(ConnPid).

+ 3 - 0
changes/ee/fix-12892.md

@@ -0,0 +1,3 @@
+Fix an error in OCPP gateway's handling of downstream BootNotification.
+
+Fix the `gateways/ocpp/listeners` endpoint to return the correct number of current connections.