|
|
@@ -45,9 +45,10 @@ start_listener(ListenerId) ->
|
|
|
-spec start_listener(atom(), atom(), map()) -> ok | {error, term()}.
|
|
|
start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) ->
|
|
|
case do_start_listener(ZoneName, ListenerName, Conf) of
|
|
|
- {error, listener_disabled} ->
|
|
|
- console_print("- Skip - starting ~s listener ~s on ~s ~n",
|
|
|
- [Type, listener_id(ZoneName, ListenerName), format(Bind)]);
|
|
|
+ {ok, {skipped, Reason}} when Reason =:= listener_disabled;
|
|
|
+ Reason =:= quic_app_missing ->
|
|
|
+ console_print("- Skip - starting ~s listener ~s on ~s ~n due to ~p",
|
|
|
+ [Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]);
|
|
|
{ok, _} ->
|
|
|
console_print("Start ~s listener ~s on ~s successfully.~n",
|
|
|
[Type, listener_id(ZoneName, ListenerName), format(Bind)]);
|
|
|
@@ -68,7 +69,7 @@ console_print(_Fmt, _Args) -> ok.
|
|
|
-spec(do_start_listener(atom(), atom(), map())
|
|
|
-> {ok, pid() | {skipped, atom()}} | {error, term()}).
|
|
|
do_start_listener(_ZoneName, _ListenerName, #{enabled := false}) ->
|
|
|
- {error, listener_disabled};
|
|
|
+ {ok, {skipped, listener_disabled}};
|
|
|
do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) ->
|
|
|
esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)),
|
|
|
{emqx_connection, start_link,
|
|
|
@@ -88,25 +89,30 @@ do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts
|
|
|
|
|
|
%% Start MQTT/QUIC listener
|
|
|
do_start_listener(ZoneName, ListenerName, #{type := quic, bind := ListenOn} = Opts) ->
|
|
|
- %% @fixme unsure why we need reopen lib and reopen config.
|
|
|
- quicer_nif:open_lib(),
|
|
|
- quicer_nif:reg_open(),
|
|
|
- DefAcceptors = erlang:system_info(schedulers_online) * 8,
|
|
|
- ListenOpts = [ {cert, maps:get(certfile, Opts)}
|
|
|
- , {key, maps:get(keyfile, Opts)}
|
|
|
- , {alpn, ["mqtt"]}
|
|
|
- , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)}
|
|
|
- , {idle_timeout_ms, emqx_config:get_zone_conf(ZoneName, [mqtt, idle_timeout])}
|
|
|
- ],
|
|
|
- ConnectionOpts = #{conn_callback => emqx_quic_connection
|
|
|
- , peer_unidi_stream_count => 1
|
|
|
- , peer_bidi_stream_count => 10
|
|
|
- , zone => ZoneName
|
|
|
- , listener => ListenerName
|
|
|
- },
|
|
|
- StreamOpts = [],
|
|
|
- quicer:start_listener(listener_id(ZoneName, ListenerName),
|
|
|
- port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts}).
|
|
|
+ case [ A || {quicer, _, _} = A<-application:which_applications() ] of
|
|
|
+ [_] ->
|
|
|
+ %% @fixme unsure why we need reopen lib and reopen config.
|
|
|
+ quicer_nif:open_lib(),
|
|
|
+ quicer_nif:reg_open(),
|
|
|
+ DefAcceptors = erlang:system_info(schedulers_online) * 8,
|
|
|
+ ListenOpts = [ {cert, maps:get(certfile, Opts)}
|
|
|
+ , {key, maps:get(keyfile, Opts)}
|
|
|
+ , {alpn, ["mqtt"]}
|
|
|
+ , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)}
|
|
|
+ , {idle_timeout_ms, emqx_config:get_zone_conf(ZoneName, [mqtt, idle_timeout])}
|
|
|
+ ],
|
|
|
+ ConnectionOpts = #{conn_callback => emqx_quic_connection
|
|
|
+ , peer_unidi_stream_count => 1
|
|
|
+ , peer_bidi_stream_count => 10
|
|
|
+ , zone => ZoneName
|
|
|
+ , listener => ListenerName
|
|
|
+ },
|
|
|
+ StreamOpts = [],
|
|
|
+ quicer:start_listener(listener_id(ZoneName, ListenerName),
|
|
|
+ port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts});
|
|
|
+ [] ->
|
|
|
+ {ok, {skipped, quic_app_missing}}
|
|
|
+ end.
|
|
|
|
|
|
esockd_opts(Opts0) ->
|
|
|
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|