|
|
@@ -0,0 +1,251 @@
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+-module(emqx_bridge_syskeeper_proxy_server).
|
|
|
+
|
|
|
+-behaviour(gen_statem).
|
|
|
+
|
|
|
+-include_lib("emqx/include/logger.hrl").
|
|
|
+
|
|
|
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
|
|
+
|
|
|
+%% `emqx_resource' API
|
|
|
+-export([
|
|
|
+ query_mode/1,
|
|
|
+ on_start/2,
|
|
|
+ on_stop/2,
|
|
|
+ on_get_status/2
|
|
|
+]).
|
|
|
+
|
|
|
+%% API
|
|
|
+-export([start_link/3]).
|
|
|
+
|
|
|
+%% gen_statem callbacks
|
|
|
+-export([callback_mode/0, init/1, terminate/3, code_change/4]).
|
|
|
+-export([handle_event/4]).
|
|
|
+
|
|
|
+-type state() :: wait_ready | handshake | running.
|
|
|
+-type data() :: #{
|
|
|
+ transport := atom(),
|
|
|
+ socket := inet:socket(),
|
|
|
+ frame_state :=
|
|
|
+ undefined
|
|
|
+ | emqx_bridge_sysk_frame:state(),
|
|
|
+ buffer := binary(),
|
|
|
+ conf := map()
|
|
|
+}.
|
|
|
+
|
|
|
+-define(DEFAULT_PORT, 9092).
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+%% emqx_resource
|
|
|
+
|
|
|
+query_mode(_) ->
|
|
|
+ no_queries.
|
|
|
+
|
|
|
+on_start(
|
|
|
+ InstanceId,
|
|
|
+ #{
|
|
|
+ listen := Server,
|
|
|
+ acceptors := Acceptors
|
|
|
+ } = Config
|
|
|
+) ->
|
|
|
+ ?SLOG(info, #{
|
|
|
+ msg => "starting_syskeeper_connector",
|
|
|
+ connector => InstanceId,
|
|
|
+ config => Config
|
|
|
+ }),
|
|
|
+
|
|
|
+ #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{
|
|
|
+ default_port => ?DEFAULT_PORT
|
|
|
+ }),
|
|
|
+ ListenOn = {Host, Port},
|
|
|
+
|
|
|
+ Options = [
|
|
|
+ {acceptors, Acceptors},
|
|
|
+ {tcp_options, [{mode, binary}, {reuseaddr, true}, {nodelay, true}]}
|
|
|
+ ],
|
|
|
+ MFArgs = {?MODULE, start_link, [maps:with([handshake_timeout], Config)]},
|
|
|
+ ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn),
|
|
|
+
|
|
|
+ case esockd:open(?MODULE, ListenOn, Options, MFArgs) of
|
|
|
+ {ok, _} ->
|
|
|
+ {ok, #{listen_on => ListenOn}};
|
|
|
+ Error ->
|
|
|
+ Error
|
|
|
+ end.
|
|
|
+
|
|
|
+on_stop(InstanceId, _State) ->
|
|
|
+ ?SLOG(info, #{
|
|
|
+ msg => "stopping_syskeeper_connector",
|
|
|
+ connector => InstanceId
|
|
|
+ }),
|
|
|
+ case emqx_resource:get_allocated_resources(InstanceId) of
|
|
|
+ #{listen_on := ListenOn} ->
|
|
|
+ esockd:close(?MODULE, ListenOn);
|
|
|
+ _ ->
|
|
|
+ ok
|
|
|
+ end.
|
|
|
+
|
|
|
+on_get_status(_InstanceId, #{listen_on := ListenOn}) ->
|
|
|
+ try
|
|
|
+ _ = esockd:listener({?MODULE, ListenOn}),
|
|
|
+ connected
|
|
|
+ catch
|
|
|
+ _:_ ->
|
|
|
+ disconnected
|
|
|
+ end.
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+-spec start_link(atom(), inet:socket(), map()) ->
|
|
|
+ {ok, Pid :: pid()}
|
|
|
+ | ignore
|
|
|
+ | {error, Error :: term()}.
|
|
|
+start_link(Transport, Socket, Conf) ->
|
|
|
+ gen_statem:start_link(?MODULE, [Transport, Socket, Conf], []).
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+%% gen_statem callbacks
|
|
|
+
|
|
|
+-spec callback_mode() -> gen_statem:callback_mode_result().
|
|
|
+callback_mode() -> handle_event_function.
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+-spec init(Args :: term()) ->
|
|
|
+ gen_statem:init_result(term()).
|
|
|
+init([Transport, Socket, Conf]) ->
|
|
|
+ {ok, wait_ready,
|
|
|
+ #{
|
|
|
+ transport => Transport,
|
|
|
+ socket => Socket,
|
|
|
+ conf => Conf,
|
|
|
+ buffer => <<>>,
|
|
|
+ frame_state => undefined
|
|
|
+ },
|
|
|
+ {next_event, internal, wait_ready}}.
|
|
|
+
|
|
|
+handle_event(internal, wait_ready, wait_ready, Data) ->
|
|
|
+ wait_ready(Data);
|
|
|
+handle_event(state_timeout, handshake_timeout, handshake, _Data) ->
|
|
|
+ %% ?LOG(error, "Handshake tiemout~n", []),
|
|
|
+ {stop, normal};
|
|
|
+handle_event(internal, try_parse, running, Data) ->
|
|
|
+ try_parse(running, Data);
|
|
|
+handle_event(info, {tcp, _Socket, Bin}, State, Data) ->
|
|
|
+ try_parse(State, combine_buffer(Bin, Data));
|
|
|
+handle_event(info, {tcp_closed, _}, _State, _Data) ->
|
|
|
+ {stop, normal};
|
|
|
+handle_event(info, {tcp_error, _, _Reason}, _State, _Data) ->
|
|
|
+ %% ?LOG(warning, "TCP error, reason:~p~n", [Reason]),
|
|
|
+ {stop, normal};
|
|
|
+handle_event(_Event, _Content, _State, _Data) ->
|
|
|
+ %% ?LOG(warning, "Unexpected event:~p, Context:~p, State:~p~n", [Event, Content, State]),
|
|
|
+ keep_state_and_data.
|
|
|
+
|
|
|
+-spec terminate(Reason :: term(), State :: state(), Data :: data()) ->
|
|
|
+ any().
|
|
|
+terminate(_Reason, _State, _Data) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
+code_change(_OldVsn, State, Data, _Extra) ->
|
|
|
+ {ok, State, Data}.
|
|
|
+
|
|
|
+%% -------------------------------------------------------------------------------------------------
|
|
|
+%%% Internal functions
|
|
|
+send(#{transport := Transport, socket := Socket}, Bin) ->
|
|
|
+ Transport:send(Socket, Bin).
|
|
|
+
|
|
|
+ack(Data) ->
|
|
|
+ ack(Data, true).
|
|
|
+
|
|
|
+ack(Data, false) ->
|
|
|
+ send(Data, <<0>>);
|
|
|
+ack(Data, true) ->
|
|
|
+ send(Data, <<16#FF>>).
|
|
|
+
|
|
|
+wait_ready(
|
|
|
+ #{
|
|
|
+ transport := Transport,
|
|
|
+ socket := RawSocket,
|
|
|
+ conf := #{handshake_timeout := Timeout}
|
|
|
+ } =
|
|
|
+ Data
|
|
|
+) ->
|
|
|
+ case Transport:wait(RawSocket) of
|
|
|
+ {ok, Socket} ->
|
|
|
+ Transport:setopts(Socket, [{active, true}]),
|
|
|
+ {next_state, handshake,
|
|
|
+ Data#{
|
|
|
+ socket => Socket,
|
|
|
+ frame_state => undefined
|
|
|
+ },
|
|
|
+ {state_timeout, Timeout, handshake_timeout}};
|
|
|
+ {error, Reason} ->
|
|
|
+ ok = Transport:fast_close(RawSocket),
|
|
|
+ {stop, Reason}
|
|
|
+ end.
|
|
|
+
|
|
|
+combine_buffer(Bin, #{buffer := Buffer} = Data) ->
|
|
|
+ Data#{buffer := <<Buffer/binary, Bin/binary>>}.
|
|
|
+
|
|
|
+try_parse(State, #{buffer := Bin} = Data) ->
|
|
|
+ case emqx_bridge_syskeeper_frame:parse_variable_byte_integer(Bin) of
|
|
|
+ {ok, Len, Rest} ->
|
|
|
+ case Rest of
|
|
|
+ <<Payload:Len/binary, Rest2/binary>> ->
|
|
|
+ Data2 = Data#{buffer := Rest2},
|
|
|
+ Result = parse(Payload, Data2),
|
|
|
+ handle_parse_result(Result, State, Data2);
|
|
|
+ _ ->
|
|
|
+ {keep_state, Data}
|
|
|
+ end;
|
|
|
+ {error, incomplete} ->
|
|
|
+ {keep_state, Data};
|
|
|
+ {error, _Reason} ->
|
|
|
+ %% ?LOG(warning, "Parse error, reason:~p, buffer:~p~n", [Reason, Bin]),
|
|
|
+ {stop, parse_error}
|
|
|
+ end.
|
|
|
+
|
|
|
+%% maybe handshake
|
|
|
+parse(Bin, #{frame_state := undefined}) ->
|
|
|
+ emqx_bridge_syskeeper_frame:parse_handshake(Bin);
|
|
|
+parse(Bin, #{frame_state := State}) ->
|
|
|
+ emqx_bridge_syskeeper_frame:parse(Bin, State).
|
|
|
+
|
|
|
+do_forward(Ack, Messages, Data) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(Message) ->
|
|
|
+ Msg = emqx_message:from_map(Message#{headers => #{}, extra => #{}}),
|
|
|
+ _ = emqx_broker:safe_publish(Msg)
|
|
|
+ end,
|
|
|
+ Messages
|
|
|
+ ),
|
|
|
+ case Ack of
|
|
|
+ true ->
|
|
|
+ ack(Data);
|
|
|
+ _ ->
|
|
|
+ ok
|
|
|
+ end.
|
|
|
+
|
|
|
+handle_parse_result({ok, Msg}, State, Data) ->
|
|
|
+ handle_packet(Msg, State, Data);
|
|
|
+handle_parse_result({error, _Reason} = Error, State, Data) ->
|
|
|
+ handle_parse_error(Error, State, #{buffer := _Bin} = Data),
|
|
|
+ %% ?LOG(warning, "Parse error, state:~p, reason:~p, buffer:~p~n", [State, Reason, Bin]),
|
|
|
+ {stop, parse_error}.
|
|
|
+
|
|
|
+handle_parse_error(_, handshake, Data) ->
|
|
|
+ ack(Data, false);
|
|
|
+handle_parse_error(_, _, _) ->
|
|
|
+ ok.
|
|
|
+
|
|
|
+handle_packet({FrameState, _Shake}, handshake, Data) ->
|
|
|
+ ack(Data),
|
|
|
+ {next_state, running, Data#{frame_state := FrameState}, {next_event, internal, try_parse}};
|
|
|
+handle_packet(#{type := forward, ack := Ack, messages := Messages}, running, Data) ->
|
|
|
+ do_forward(Ack, Messages, Data),
|
|
|
+ try_parse(running, Data);
|
|
|
+handle_packet(#{type := heartbeat}, running, Data) ->
|
|
|
+ ack(Data),
|
|
|
+ try_parse(running, Data).
|