Просмотр исходного кода

Merge pull request #10598 from HJianBo/refactor_exproto_stream

refactor(exproto): support unary handler
JianBo He 2 лет назад
Родитель
Сommit
c67135dd4b

+ 1 - 1
apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl

@@ -249,7 +249,7 @@ t_case_stomp(_) ->
 t_case_exproto(_) ->
     Mod = emqx_exproto_SUITE,
     SvrMod = emqx_exproto_echo_svr,
-    Svrs = SvrMod:start(),
+    Svrs = SvrMod:start(http),
     Login = fun(Username, Password, Expect) ->
         with_resource(
             ?FUNCTOR(Mod:open(tcp)),

+ 2 - 2
apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl

@@ -332,7 +332,7 @@ t_case_sn_subscribe(_) ->
 t_case_exproto_publish(_) ->
     Mod = emqx_exproto_SUITE,
     SvrMod = emqx_exproto_echo_svr,
-    Svrs = SvrMod:start(),
+    Svrs = SvrMod:start(http),
     Payload = <<"publish with authz">>,
     Publish = fun(Topic, Checker) ->
         with_resource(
@@ -369,7 +369,7 @@ t_case_exproto_publish(_) ->
 t_case_exproto_subscribe(_) ->
     Mod = emqx_exproto_SUITE,
     SvrMod = emqx_exproto_echo_svr,
-    Svrs = SvrMod:start(),
+    Svrs = SvrMod:start(http),
     WaitTime = 5000,
     Sub = fun(Topic, ErrorCode) ->
         with_resource(

+ 2 - 0
apps/emqx_gateway_exproto/.gitignore

@@ -22,3 +22,5 @@ src/emqx_exproto_v_1_connection_adapter_bhvr.erl
 src/emqx_exproto_v_1_connection_adapter_client.erl
 src/emqx_exproto_v_1_connection_handler_bhvr.erl
 src/emqx_exproto_v_1_connection_handler_client.erl
+src/emqx_exproto_v_1_connection_unary_handler_bhvr.erl
+src/emqx_exproto_v_1_connection_unary_handler_client.erl

+ 39 - 0
apps/emqx_gateway_exproto/priv/protos/exproto.proto

@@ -41,8 +41,12 @@ service ConnectionAdapter {
   rpc Subscribe(SubscribeRequest) returns (CodeResponse) {};
 
   rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {};
+
+  rpc RawPublish(RawPublishRequest) returns (CodeResponse) {};
 }
 
+// Deprecated service.
+// Please using `ConnectionUnaryHandler` to replace it
 service ConnectionHandler {
 
   // -- socket layer
@@ -60,6 +64,32 @@ service ConnectionHandler {
   rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {};
 }
 
+// This service is an optimization of `ConnectionHandler`.
+// In the initial version, we expected to use streams to improve the efficiency
+// of requests. But unfortunately, events between different streams are out of
+// order. it causes the `OnSocketCreated` event to may arrive later than `OnReceivedBytes`.
+//
+// So we added the `ConnectionUnaryHandler` service since v4.3.21/v4.4.10 and forced
+// the use of Unary in it to avoid ordering problems.
+//
+// Recommend using `ConnectionUnaryHandler` to replace `ConnectionHandler`
+service ConnectionUnaryHandler {
+
+  // -- socket layer
+
+  rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {};
+
+  rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {};
+
+  rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {};
+
+  // -- pub/sub layer
+
+  rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {};
+
+  rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {};
+}
+
 message EmptySuccess { }
 
 enum ResultCode {
@@ -137,6 +167,15 @@ message PublishRequest {
   bytes payload = 4;
 }
 
+message RawPublishRequest {
+
+  string topic = 1;
+
+  uint32 qos = 2;
+
+  bytes payload = 3;
+}
+
 message SubscribeRequest {
 
   string conn = 1;

+ 80 - 65
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -45,7 +45,7 @@
     %% Context
     ctx :: emqx_gateway_ctx:context(),
     %% gRPC channel options
-    gcli :: map(),
+    gcli :: emqx_exproto_gcli:grpc_client_state(),
     %% Conn info
     conninfo :: emqx_types:conninfo(),
     %% Client info from `register` function
@@ -54,10 +54,6 @@
     conn_state :: conn_state(),
     %% Subscription
     subscriptions = #{},
-    %% Request queue
-    rqueue = queue:new(),
-    %% Inflight function name
-    inflight = undefined,
     %% Keepalive
     keepalive :: maybe(emqx_keepalive:keepalive()),
     %% Timers
@@ -150,9 +146,11 @@ init(
     },
     Options
 ) ->
+    GRpcChann = maps:get(grpc_client_channel, Options),
+    ServiceName = maps:get(grpc_client_service_name, Options),
+    GRpcClient = emqx_exproto_gcli:init(ServiceName, #{channel => GRpcChann}),
+
     Ctx = maps:get(ctx, Options),
-    GRpcChann = maps:get(handler, Options),
-    PoolName = maps:get(pool_name, Options),
     IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
 
     NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
@@ -170,7 +168,7 @@ init(
     },
     Channel = #channel{
         ctx = Ctx,
-        gcli = #{channel => GRpcChann, pool_name => PoolName},
+        gcli = GRpcClient,
         conninfo = NConnInfo,
         clientinfo = ClientInfo,
         conn_state = connecting,
@@ -188,9 +186,7 @@ init(
                 }
             )
     },
-    start_idle_checking_timer(
-        try_dispatch(on_socket_created, wrap(Req), Channel)
-    ).
+    dispatch(on_socket_created, Req, start_idle_checking_timer(Channel)).
 
 %% @private
 peercert(NoSsl, ConnInfo) when
@@ -239,7 +235,7 @@ start_idle_checking_timer(Channel) ->
     | {shutdown, Reason :: term(), channel()}.
 handle_in(Data, Channel) ->
     Req = #{bytes => Data},
-    {ok, try_dispatch(on_received_bytes, wrap(Req), Channel)}.
+    {ok, dispatch(on_received_bytes, Req, Channel)}.
 
 -spec handle_deliver(list(emqx_types:deliver()), channel()) ->
     {ok, channel()}
@@ -276,7 +272,7 @@ handle_deliver(
         Delivers
     ),
     Req = #{messages => Msgs},
-    {ok, try_dispatch(on_received_messages, wrap(Req), Channel)}.
+    {ok, dispatch(on_received_messages, Req, Channel)}.
 
 -spec handle_timeout(reference(), Msg :: term(), channel()) ->
     {ok, channel()}
@@ -301,10 +297,13 @@ handle_timeout(
             NChannel = remove_timer_ref(alive_timer, Channel),
             %% close connection if keepalive timeout
             Replies = [{event, disconnected}, {close, keepalive_timeout}],
-            {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
+            NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{
+                closed_reason = keepalive_timeout
+            }),
+            {ok, Replies, NChannel1}
     end;
 handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
-    {shutdown, {error, {force_close, Reason}}, Channel};
+    {shutdown, Reason, Channel};
 handle_timeout(_TRef, force_close_idle, Channel) ->
     {shutdown, idle_timeout, Channel};
 handle_timeout(_TRef, Msg, Channel) ->
@@ -331,10 +330,20 @@ handle_call(
     Channel = #channel{conn_state = connected}
 ) ->
     ?SLOG(warning, #{
-        msg => "ingore_duplicated_authorized_command",
+        msg => "ingore_duplicated_authenticate_command",
         request_clientinfo => ClientInfo
     }),
     {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
+handle_call(
+    {auth, ClientInfo, _Password},
+    _From,
+    Channel = #channel{conn_state = disconnected}
+) ->
+    ?SLOG(warning, #{
+        msg => "authenticate_command_after_socket_disconnected",
+        request_clientinfo => ClientInfo
+    }),
+    {reply, {error, ?RESP_PERMISSION_DENY, <<"Client socket disconnected">>}, Channel};
 handle_call(
     {auth, ClientInfo0, Password},
     _From,
@@ -467,10 +476,21 @@ handle_call(kick, _From, Channel) ->
     {reply, ok, [{event, disconnected}, {close, kicked}], Channel};
 handle_call(discard, _From, Channel) ->
     {shutdown, discarded, ok, Channel};
-handle_call(Req, _From, Channel) ->
+handle_call(
+    Req,
+    _From,
+    Channel = #channel{
+        conn_state = ConnState,
+        clientinfo = ClientInfo,
+        closed_reason = ClosedReason
+    }
+) ->
     ?SLOG(warning, #{
         msg => "unexpected_call",
-        call => Req
+        call => Req,
+        conn_state => ConnState,
+        clientid => maps:get(clientid, ClientInfo, undefined),
+        closed_reason => ClosedReason
     }),
     {reply, {error, unexpected_call}, Channel}.
 
@@ -490,32 +510,50 @@ handle_cast(Req, Channel) ->
     | {shutdown, Reason :: term(), channel()}.
 handle_info(
     {sock_closed, Reason},
-    Channel = #channel{rqueue = Queue, inflight = Inflight}
+    Channel = #channel{gcli = GClient, closed_reason = ClosedReason}
 ) ->
-    case
-        queue:len(Queue) =:= 0 andalso
-            Inflight =:= undefined
-    of
+    case emqx_exproto_gcli:is_empty(GClient) of
         true ->
-            Channel1 = ensure_disconnected({sock_closed, Reason}, Channel),
+            Channel1 = ensure_disconnected(Reason, Channel),
             {shutdown, Reason, Channel1};
         _ ->
             %% delayed close process for flushing all callback funcs to gRPC server
-            Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}},
+            Channel1 =
+                case ClosedReason of
+                    undefined ->
+                        Channel#channel{closed_reason = Reason};
+                    _ ->
+                        Channel
+                end,
             Channel2 = ensure_timer(force_timer, Channel1),
-            {ok, ensure_disconnected({sock_closed, Reason}, Channel2)}
+            {ok, ensure_disconnected(Reason, Channel2)}
     end;
-handle_info({hreply, on_socket_created, ok}, Channel) ->
-    dispatch_or_close_process(Channel#channel{inflight = undefined});
-handle_info({hreply, FunName, ok}, Channel) when
-    FunName == on_socket_closed;
-    FunName == on_received_bytes;
-    FunName == on_received_messages;
-    FunName == on_timer_timeout
+handle_info(
+    {hreply, FunName, Result},
+    Channel0 = #channel{gcli = GClient0, timers = Timers}
+) when
+    FunName =:= on_socket_created;
+    FunName =:= on_socket_closed;
+    FunName =:= on_received_bytes;
+    FunName =:= on_received_messages;
+    FunName =:= on_timer_timeout
 ->
-    dispatch_or_close_process(Channel#channel{inflight = undefined});
-handle_info({hreply, FunName, {error, Reason}}, Channel) ->
-    {shutdown, {error, {FunName, Reason}}, Channel};
+    GClient = emqx_exproto_gcli:ack(FunName, GClient0),
+    Channel = Channel0#channel{gcli = GClient},
+
+    ShutdownNow =
+        emqx_exproto_gcli:is_empty(GClient) andalso
+            maps:get(force_timer, Timers, undefined) =/= undefined,
+    case Result of
+        ok when not ShutdownNow ->
+            GClient1 = emqx_exproto_gcli:maybe_shoot(GClient),
+            {ok, Channel#channel{gcli = GClient1}};
+        ok when ShutdownNow ->
+            Channel1 = cancel_timer(force_timer, Channel),
+            {shutdown, Channel1#channel.closed_reason, Channel1};
+        {error, Reason} ->
+            {shutdown, {error, {FunName, Reason}}, Channel}
+    end;
 handle_info({subscribe, _}, Channel) ->
     {ok, Channel};
 handle_info(Info, Channel) ->
@@ -528,7 +566,8 @@ handle_info(Info, Channel) ->
 -spec terminate(any(), channel()) -> channel().
 terminate(Reason, Channel) ->
     Req = #{reason => stringfy(Reason)},
-    try_dispatch(on_socket_closed, wrap(Req), Channel).
+    %% XXX: close streams?
+    dispatch(on_socket_closed, Req, Channel).
 
 %%--------------------------------------------------------------------
 %% Sub/UnSub
@@ -705,34 +744,10 @@ interval(alive_timer, #channel{keepalive = Keepalive}) ->
 %% Dispatch
 %%--------------------------------------------------------------------
 
-wrap(Req) ->
-    Req#{conn => base64:encode(term_to_binary(self()))}.
-
-dispatch_or_close_process(
-    Channel = #channel{
-        rqueue = Queue,
-        inflight = undefined,
-        gcli = GClient
-    }
-) ->
-    case queue:out(Queue) of
-        {empty, _} ->
-            case Channel#channel.conn_state of
-                disconnected ->
-                    {shutdown, Channel#channel.closed_reason, Channel};
-                _ ->
-                    {ok, Channel}
-            end;
-        {{value, {FunName, Req}}, NQueue} ->
-            emqx_exproto_gcli:async_call(FunName, Req, GClient),
-            {ok, Channel#channel{inflight = FunName, rqueue = NQueue}}
-    end.
-
-try_dispatch(FunName, Req, Channel = #channel{inflight = undefined, gcli = GClient}) ->
-    emqx_exproto_gcli:async_call(FunName, Req, GClient),
-    Channel#channel{inflight = FunName};
-try_dispatch(FunName, Req, Channel = #channel{rqueue = Queue}) ->
-    Channel#channel{rqueue = queue:in({FunName, Req}, Queue)}.
+dispatch(FunName, Req, Channel = #channel{gcli = GClient}) ->
+    Req1 = Req#{conn => base64:encode(term_to_binary(self()))},
+    NGClient = emqx_exproto_gcli:maybe_shoot(FunName, Req1, GClient),
+    Channel#channel{gcli = NGClient}.
 
 %%--------------------------------------------------------------------
 %% Format

+ 131 - 95
apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl

@@ -14,161 +14,197 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% the gRPC client worker for ConnectionHandler service
+%% the gRPC client agent for ConnectionHandler service
 -module(emqx_exproto_gcli).
 
--behaviour(gen_server).
-
 -include_lib("emqx/include/logger.hrl").
 
-%% APIs
--export([async_call/3]).
-
--export([start_link/2]).
+-logger_header("[ExProtoGCli]").
 
-%% gen_server callbacks
 -export([
-    init/1,
-    handle_call/3,
-    handle_cast/2,
-    handle_info/2,
-    terminate/2,
-    code_change/3
+    init/2,
+    maybe_shoot/1,
+    maybe_shoot/3,
+    ack/2,
+    is_empty/1
 ]).
 
--record(state, {
-    pool,
-    id,
-    streams
-}).
+-define(CONN_HANDLER_MOD, emqx_exproto_v_1_connection_handler_client).
+-define(CONN_UNARY_HANDLER_MOD, emqx_exproto_v_1_connection_unary_handler_client).
+
+-type service_name() :: 'ConnectionUnaryHandler' | 'ConnectionHandler'.
+
+-type grpc_client_state() ::
+    #{
+        owner := pid(),
+        service_name := service_name(),
+        client_opts := options(),
+        queue := queue:queue(),
+        inflight := atom() | undefined,
+        streams => map(),
+        middleman => pid() | undefined
+    }.
 
--define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client).
+-type options() ::
+    #{channel := term()}.
 
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
 
-start_link(Pool, Id) ->
-    gen_server:start_link(
-        {local, emqx_utils:proc_name(?MODULE, Id)},
-        ?MODULE,
-        [Pool, Id],
-        []
-    ).
-
--spec async_call(atom(), map(), map()) -> ok.
-async_call(
-    FunName,
-    Req = #{conn := Conn},
-    Options = #{pool_name := PoolName}
-) ->
-    case pick(PoolName, Conn) of
+-spec init(service_name(), options()) -> grpc_client_state().
+init(ServiceName, Options) ->
+    #{
+        owner => self(),
+        service_name => ServiceName,
+        client_opts => Options,
+        queue => queue:new(),
+        inflight => undefined
+    }.
+
+-spec maybe_shoot(atom(), map(), grpc_client_state()) -> grpc_client_state().
+maybe_shoot(FunName, Req, GState = #{inflight := undefined}) ->
+    shoot(FunName, Req, GState);
+maybe_shoot(FunName, Req, GState) ->
+    enqueue(FunName, Req, GState).
+
+-spec maybe_shoot(grpc_client_state()) -> grpc_client_state().
+maybe_shoot(GState = #{inflight := undefined, queue := Q}) ->
+    case queue:is_empty(Q) of
+        true ->
+            GState;
         false ->
-            reply(self(), FunName, {error, no_available_grpc_client});
-        Pid when is_pid(Pid) ->
-            cast(Pid, {rpc, FunName, Req, Options, self()})
-    end,
-    ok.
+            {{value, {FunName, Req}}, Q1} = queue:out(Q),
+            shoot(FunName, Req, GState#{queue => Q1})
+    end.
+
+-spec ack(atom(), grpc_client_state()) -> grpc_client_state().
+ack(FunName, GState = #{inflight := FunName}) ->
+    GState#{inflight => undefined, middleman => undefined};
+ack(_, _) ->
+    error(badarg).
+
+-spec is_empty(grpc_client_state()) -> boolean().
+is_empty(#{queue := Q, inflight := Inflight}) ->
+    Inflight == undefined andalso queue:is_empty(Q).
 
 %%--------------------------------------------------------------------
-%% cast, pick
+%% Internal funcs
 %%--------------------------------------------------------------------
 
--compile({inline, [cast/2, pick/2]}).
+enqueue(FunName, Req, GState = #{queue := Q}) ->
+    GState#{queue => queue:in({FunName, Req}, Q)}.
 
-cast(Deliver, Msg) ->
-    gen_server:cast(Deliver, Msg).
+shoot(FunName, Req, GState) ->
+    ServiceName = maps:get(service_name, GState),
+    shoot(ServiceName, FunName, Req, GState).
 
--spec pick(term(), term()) -> pid() | false.
-pick(PoolName, Conn) ->
-    gproc_pool:pick_worker(PoolName, Conn).
+shoot(
+    'ConnectionUnaryHandler',
+    FunName,
+    Req,
+    GState = #{owner := Owner, client_opts := Options}
+) ->
+    Pid =
+        spawn(
+            fun() ->
+                try
+                    Result = request(FunName, Req, Options),
+                    hreply(Owner, Result, FunName)
+                catch
+                    T:R:Stk ->
+                        hreply(Owner, {error, {{T, R}, Stk}}, FunName)
+                end
+            end
+        ),
+    GState#{inflight => FunName, middleman => Pid};
+shoot(
+    'ConnectionHandler',
+    FunName,
+    Req,
+    GState
+) ->
+    GState1 = streaming(FunName, Req, GState),
+    GState1#{inflight => FunName}.
 
 %%--------------------------------------------------------------------
-%% gen_server callbacks
-%%--------------------------------------------------------------------
-
-init([Pool, Id]) ->
-    true = gproc_pool:connect_worker(Pool, {Pool, Id}),
-    {ok, #state{pool = Pool, id = Id, streams = #{}}}.
+%% streaming
 
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) ->
-    case ensure_stream_opened(Fun, Options, Streams) of
+streaming(
+    FunName,
+    Req,
+    GState = #{owner := Owner, client_opts := Options}
+) ->
+    Streams = maps:get(streams, GState, #{}),
+    case ensure_stream_opened(FunName, Options, Streams) of
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "request_grpc_server_failed",
-                function => {?CONN_ADAPTER_MOD, Fun, Options},
+                function => {FunName, Options},
                 reason => Reason
             }),
-            reply(From, Fun, {error, Reason}),
-            {noreply, State#state{streams = Streams#{Fun => undefined}}};
+            hreply(Owner, {error, Reason}, FunName),
+            {ok, GState};
         {ok, Stream} ->
             case catch grpc_client:send(Stream, Req) of
                 ok ->
                     ?SLOG(debug, #{
                         msg => "send_grpc_request_succeed",
-                        function => {?CONN_ADAPTER_MOD, Fun},
+                        function => FunName,
                         request => Req
                     }),
-                    reply(From, Fun, ok),
-                    {noreply, State#state{streams = Streams#{Fun => Stream}}};
+                    hreply(Owner, ok, FunName),
+                    GState#{streams => Streams#{FunName => Stream}};
                 {'EXIT', {not_found, _Stk}} ->
                     %% Not found the stream, reopen it
                     ?SLOG(info, #{
                         msg => "cannt_find_old_stream_ref",
-                        function => {?CONN_ADAPTER_MOD, Fun}
+                        function => FunName
                     }),
-                    handle_cast(
-                        {rpc, Fun, Req, Options, From},
-                        State#state{streams = maps:remove(Fun, Streams)}
-                    );
+                    streaming(FunName, Req, GState#{streams => maps:remove(FunName, Streams)});
                 {'EXIT', {timeout, _Stk}} ->
                     ?SLOG(error, #{
                         msg => "send_grpc_request_timeout",
-                        function => {?CONN_ADAPTER_MOD, Fun},
+                        function => FunName,
                         request => Req
                     }),
-                    reply(From, Fun, {error, timeout}),
-                    {noreply, State#state{streams = Streams#{Fun => Stream}}};
+                    hreply(Owner, {error, timeout}, FunName),
+                    GState;
                 {'EXIT', {Reason1, Stk}} ->
                     ?SLOG(error, #{
                         msg => "send_grpc_request_failed",
-                        function => {?CONN_ADAPTER_MOD, Fun},
+                        function => FunName,
                         request => Req,
                         error => Reason1,
                         stacktrace => Stk
                     }),
-                    reply(From, Fun, {error, Reason1}),
-                    {noreply, State#state{streams = Streams#{Fun => undefined}}}
+                    hreply(Owner, {error, Reason1}, FunName),
+                    GState
             end
     end.
 
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-reply(Pid, Fun, Result) ->
-    Pid ! {hreply, Fun, Result},
-    ok.
-
-ensure_stream_opened(Fun, Options, Streams) ->
-    case maps:get(Fun, Streams, undefined) of
+ensure_stream_opened(FunName, Options, Streams) ->
+    case maps:get(FunName, Streams, undefined) of
         undefined ->
-            case apply(?CONN_ADAPTER_MOD, Fun, [Options]) of
+            case apply(?CONN_HANDLER_MOD, FunName, [Options]) of
                 {ok, Stream} -> {ok, Stream};
                 {error, Reason} -> {error, Reason}
             end;
         Stream ->
             {ok, Stream}
     end.
+
+%%--------------------------------------------------------------------
+%% unary
+
+request(FunName, Req, Options) ->
+    case apply(?CONN_UNARY_HANDLER_MOD, FunName, [Req, Options]) of
+        {ok, _EmptySucc, _Md} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+hreply(Owner, Result, FunName) ->
+    Owner ! {hreply, FunName, Result},
+    ok.

+ 16 - 0
apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl

@@ -33,6 +33,7 @@
     authenticate/2,
     start_timer/2,
     publish/2,
+    raw_publish/2,
     subscribe/2,
     unsubscribe/2
 ]).
@@ -129,6 +130,19 @@ publish(Req, Md) ->
     }),
     {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
 
+-spec raw_publish(emqx_exproto_pb:raw_publish_request(), grpc:metadata()) ->
+    {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
+    | {error, grpc_stream:error_response()}.
+raw_publish(Req = #{topic := Topic, qos := Qos, payload := Payload}, Md) ->
+    ?SLOG(debug, #{
+        msg => "recv_grpc_function_call",
+        function => ?FUNCTION_NAME,
+        request => Req
+    }),
+    Msg = emqx_message:make(exproto, Qos, Topic, Payload),
+    _ = emqx_broker:safe_publish(Msg),
+    {ok, response(ok), Md}.
+
 -spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata()) ->
     {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
     | {error, grpc_cowboy_h:error_response()}.
@@ -176,6 +190,8 @@ call(ConnStr, Req) ->
             {error, ?RESP_PARAMS_TYPE_ERROR, <<"The conn type error">>};
         exit:noproc ->
             {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>};
+        exit:{noproc, _} ->
+            {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>};
         exit:timeout ->
             {error, ?RESP_UNKNOWN, <<"Connection is not answered">>};
         Class:Reason:Stk ->

+ 9 - 0
apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl

@@ -74,6 +74,15 @@ fields(exproto_grpc_server) ->
 fields(exproto_grpc_handler) ->
     [
         {address, sc(binary(), #{required => true, desc => ?DESC(exproto_grpc_handler_address)})},
+        {service_name,
+            sc(
+                hoconsc:union(['ConnectionHandler', 'ConnectionUnaryHandler']),
+                #{
+                    required => true,
+                    default => 'ConnectionUnaryHandler',
+                    desc => ?DESC(exproto_grpc_handler_service_name)
+                }
+            )},
         {ssl_options,
             sc(
                 ref(emqx_schema, "ssl_client_opts"),

+ 1 - 1
apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src

@@ -1,6 +1,6 @@
 {application, emqx_gateway_exproto, [
     {description, "ExProto Gateway"},
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {registered, []},
     {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
     {env, []},

+ 32 - 25
apps/emqx_gateway_exproto/src/emqx_gateway_exproto.erl

@@ -62,22 +62,16 @@ on_gateway_load(
         GwName,
         maps:get(handler, Config, undefined)
     ),
+    ServiceName = ensure_service_name(Config),
     %% XXX: How to monitor it ?
     _ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
 
-    %% XXX: How to monitor it ?
-    PoolName = pool_name(GwName),
-    PoolSize = emqx_vm:schedulers() * 2,
-    {ok, PoolSup} = emqx_pool_sup:start_link(
-        PoolName,
-        hash,
-        PoolSize,
-        {emqx_exproto_gcli, start_link, []}
-    ),
-
     NConfig = maps:without(
         [server, handler],
-        Config#{pool_name => PoolName}
+        Config#{
+            grpc_client_channel => GwName,
+            grpc_client_service_name => ServiceName
+        }
     ),
     Listeners = emqx_gateway_utils:normalize_config(
         NConfig#{handler => GwName}
@@ -93,7 +87,7 @@ on_gateway_load(
         )
     of
         {ok, ListenerPids} ->
-            {ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}};
+            {ok, ListenerPids, _GwState = #{ctx => Ctx}};
         {error, {Reason, Listener}} ->
             throw(
                 {badconf, #{
@@ -126,11 +120,9 @@ on_gateway_unload(
         name := GwName,
         config := Config
     },
-    _GwState = #{pool := PoolSup}
+    _GwState
 ) ->
     Listeners = emqx_gateway_utils:normalize_config(Config),
-    %% Stop funcs???
-    exit(PoolSup, kill),
     stop_grpc_server(GwName),
     stop_grpc_client_channel(GwName),
     stop_listeners(GwName, Listeners).
@@ -139,8 +131,6 @@ on_gateway_unload(
 %% Internal funcs
 %%--------------------------------------------------------------------
 
-start_grpc_server(_GwName, undefined) ->
-    undefined;
 start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
     Services = #{
         protos => [emqx_exproto_pb],
@@ -179,15 +169,24 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
                     reason => illegal_grpc_server_confs
                 }}
             )
-    end.
+    end;
+start_grpc_server(_GwName, Options) ->
+    throw(
+        {badconf, #{
+            key => server,
+            value => Options,
+            reason => illegal_grpc_server_confs
+        }}
+    ).
 
 stop_grpc_server(GwName) ->
     _ = grpc:stop_server(GwName),
     console_print("Stop ~s gRPC server successfully.~n", [GwName]).
 
-start_grpc_client_channel(_GwName, undefined) ->
-    undefined;
-start_grpc_client_channel(GwName, Options = #{address := Address}) ->
+start_grpc_client_channel(
+    GwName,
+    Options = #{address := Address}
+) ->
     #{host := Host, port := Port} =
         case emqx_http_lib:uri_parse(Address) of
             {ok, URIMap0} ->
@@ -201,7 +200,7 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) ->
                     }}
                 )
         end,
-    case emqx_utils_maps:deep_get([ssl, enable], Options, false) of
+    case emqx_utils_maps:deep_get([ssl_options, enable], Options, false) of
         false ->
             SvrAddr = compose_http_uri(http, Host, Port),
             grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{});
@@ -217,7 +216,15 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) ->
 
             SvrAddr = compose_http_uri(https, Host, Port),
             grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts)
-    end.
+    end;
+start_grpc_client_channel(_GwName, Options) ->
+    throw(
+        {badconf, #{
+            key => handler,
+            value => Options,
+            reason => ililegal_grpc_client_confs
+        }}
+    ).
 
 compose_http_uri(Scheme, Host, Port) ->
     lists:flatten(
@@ -230,8 +237,8 @@ stop_grpc_client_channel(GwName) ->
     _ = grpc_client_sup:stop_channel_pool(GwName),
     ok.
 
-pool_name(GwName) ->
-    list_to_atom(lists:concat([GwName, "_gcli_pool"])).
+ensure_service_name(Config) ->
+    emqx_utils_maps:deep_get([handler, service_name], Config, 'ConnectionUnaryHandler').
 
 -ifndef(TEST).
 console_print(Fmt, Args) -> ?ULOG(Fmt, Args).

+ 134 - 27
apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl

@@ -19,8 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
--include_lib("emqx/include/emqx_hooks.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -import(
     emqx_exproto_echo_svr,
@@ -28,6 +31,7 @@
         frame_connect/2,
         frame_connack/1,
         frame_publish/3,
+        frame_raw_publish/3,
         frame_puback/1,
         frame_subscribe/2,
         frame_suback/1,
@@ -37,19 +41,24 @@
     ]
 ).
 
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
-
 -define(TCPOPTS, [binary, {active, false}]).
 -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
 
+-define(PORT, 7993).
+
+-define(DEFAULT_CLIENT, #{
+    proto_name => <<"demo">>,
+    proto_ver => <<"v0.1">>,
+    clientid => <<"test_client_1">>
+}).
+
 %%--------------------------------------------------------------------
 -define(CONF_DEFAULT, <<
     "\n"
     "gateway.exproto {\n"
     "  server.bind = 9100,\n"
     "  handler.address = \"http://127.0.0.1:9001\"\n"
+    "  handler.service_name = \"ConnectionHandler\"\n"
     "  listeners.tcp.default {\n"
     "    bind = 7993,\n"
     "    acceptors = 8\n"
@@ -62,43 +71,110 @@
 %%--------------------------------------------------------------------
 
 all() ->
-    [{group, Name} || Name <- metrics()].
+    [
+        {group, tcp_listener},
+        {group, ssl_listener},
+        {group, udp_listener},
+        {group, dtls_listener},
+        {group, https_grpc_server},
+        {group, streaming_connection_handler}
+    ].
 
 suite() ->
     [{timetrap, {seconds, 30}}].
 
 groups() ->
-    Cases = emqx_common_test_helpers:all(?MODULE),
-    [{Name, Cases} || Name <- metrics()].
-
-%% @private
-metrics() ->
-    [tcp, ssl, udp, dtls].
-
-init_per_group(GrpName, Cfg) ->
+    MainCases = [
+        t_keepalive_timeout,
+        t_mountpoint_echo,
+        t_raw_publish,
+        t_auth_deny,
+        t_acl_deny,
+        t_hook_connected_disconnected,
+        t_hook_session_subscribed_unsubscribed,
+        t_hook_message_delivered
+    ],
+    [
+        {tcp_listener, [sequence], MainCases},
+        {ssl_listener, [sequence], MainCases},
+        {udp_listener, [sequence], MainCases},
+        {dtls_listener, [sequence], MainCases},
+        {streaming_connection_handler, [sequence], MainCases},
+        {https_grpc_server, [sequence], MainCases}
+    ].
+
+init_per_group(GrpName, Cfg) when
+    GrpName == tcp_listener;
+    GrpName == ssl_listener;
+    GrpName == udp_listener;
+    GrpName == dtls_listener
+->
+    LisType =
+        case GrpName of
+            tcp_listener -> tcp;
+            ssl_listener -> ssl;
+            udp_listener -> udp;
+            dtls_listener -> dtls
+        end,
+    init_per_group(LisType, 'ConnectionUnaryHandler', http, Cfg);
+init_per_group(https_grpc_server, Cfg) ->
+    init_per_group(tcp, 'ConnectionUnaryHandler', https, Cfg);
+init_per_group(streaming_connection_handler, Cfg) ->
+    init_per_group(tcp, 'ConnectionHandler', http, Cfg);
+init_per_group(_, Cfg) ->
+    init_per_group(tcp, 'ConnectionUnaryHandler', http, Cfg).
+
+init_per_group(LisType, ServiceName, Scheme, Cfg) ->
+    Svrs = emqx_exproto_echo_svr:start(Scheme),
     application:load(emqx_gateway_exproto),
-    put(grpname, GrpName),
-    Svrs = emqx_exproto_echo_svr:start(),
-    emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway], fun set_special_cfg/1),
-    [{servers, Svrs}, {listener_type, GrpName} | Cfg].
+    emqx_common_test_helpers:start_apps(
+        [emqx_authn, emqx_gateway],
+        fun(App) ->
+            set_special_cfg(App, LisType, ServiceName, Scheme)
+        end
+    ),
+    [
+        {servers, Svrs},
+        {listener_type, LisType},
+        {service_name, ServiceName},
+        {grpc_client_scheme, Scheme}
+        | Cfg
+    ].
 
 end_per_group(_, Cfg) ->
     emqx_config:erase(gateway),
     emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn]),
     emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
 
-set_special_cfg(emqx_gateway) ->
-    LisType = get(grpname),
+init_per_testcase(TestCase, Cfg) when
+    TestCase == t_enter_passive_mode
+->
+    case proplists:get_value(listener_type, Cfg) of
+        udp -> {skip, ignore};
+        _ -> Cfg
+    end;
+init_per_testcase(_TestCase, Cfg) ->
+    Cfg.
+
+end_per_testcase(_TestCase, _Cfg) ->
+    ok.
+
+set_special_cfg(emqx_gateway, LisType, ServiceName, Scheme) ->
+    Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
     emqx_config:put(
         [gateway, exproto],
         #{
             server => #{bind => 9100},
             idle_timeout => 5000,
-            handler => #{address => "http://127.0.0.1:9001"},
+            handler => #{
+                address => Addrs,
+                service_name => ServiceName,
+                ssl_options => #{enable => Scheme == https}
+            },
             listeners => listener_confs(LisType)
         }
     );
-set_special_cfg(_App) ->
+set_special_cfg(_, _, _, _) ->
     ok.
 
 listener_confs(Type) ->
@@ -112,9 +188,6 @@ default_config() ->
 %% Tests cases
 %%--------------------------------------------------------------------
 
-t_start_stop(_) ->
-    ok.
-
 t_mountpoint_echo(Cfg) ->
     SockType = proplists:get_value(listener_type, Cfg),
     Sock = open(SockType),
@@ -158,6 +231,40 @@ t_mountpoint_echo(Cfg) ->
     end,
     close(Sock).
 
+t_raw_publish(Cfg) ->
+    SockType = proplists:get_value(listener_type, Cfg),
+    Sock = open(SockType),
+
+    Client = #{
+        proto_name => <<"demo">>,
+        proto_ver => <<"v0.1">>,
+        clientid => <<"test_client_1">>,
+        mountpoint => <<"ct/">>
+    },
+    Password = <<"123456">>,
+
+    ConnBin = frame_connect(Client, Password),
+    ConnAckBin = frame_connack(0),
+
+    send(Sock, ConnBin),
+    {ok, ConnAckBin} = recv(Sock, 5000),
+
+    PubBin2 = frame_raw_publish(<<"t/up">>, 0, <<"echo">>),
+    PubAckBin = frame_puback(0),
+
+    %% mountpoint is not used in raw publish
+    emqx:subscribe(<<"t/up">>),
+
+    send(Sock, PubBin2),
+    {ok, PubAckBin} = recv(Sock, 5000),
+
+    receive
+        {deliver, _, _} -> ok
+    after 1000 ->
+        error(echo_not_running)
+    end,
+    close(Sock).
+
 t_auth_deny(Cfg) ->
     SockType = proplists:get_value(listener_type, Cfg),
     Sock = open(SockType),
@@ -264,7 +371,7 @@ t_keepalive_timeout(Cfg) ->
             ?assertMatch(
                 {ok, #{
                     clientid := ClientId1,
-                    reason := {shutdown, {sock_closed, keepalive_timeout}}
+                    reason := {shutdown, keepalive_timeout}
                 }},
                 ?block_until(#{?snk_kind := conn_process_terminated}, 8000)
             );
@@ -272,7 +379,7 @@ t_keepalive_timeout(Cfg) ->
             ?assertMatch(
                 {ok, #{
                     clientid := ClientId1,
-                    reason := {shutdown, {sock_closed, keepalive_timeout}}
+                    reason := {shutdown, keepalive_timeout}
                 }},
                 ?block_until(#{?snk_kind := conn_process_terminated}, 12000)
             ),

+ 63 - 21
apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl

@@ -19,7 +19,7 @@
 -behaviour(emqx_exproto_v_1_connection_handler_bhvr).
 
 -export([
-    start/0,
+    start/1,
     stop/1
 ]).
 
@@ -27,12 +27,16 @@
     frame_connect/2,
     frame_connack/1,
     frame_publish/3,
+    frame_raw_publish/3,
     frame_puback/1,
     frame_subscribe/2,
     frame_suback/1,
     frame_unsubscribe/1,
     frame_unsuback/1,
-    frame_disconnect/0
+    frame_disconnect/0,
+    handle_in/3,
+    handle_out/2,
+    handle_out/3
 ]).
 
 -export([
@@ -45,19 +49,6 @@
 
 -define(LOG(Fmt, Args), ct:pal(Fmt, Args)).
 
--define(HTTP, #{
-    grpc_opts => #{
-        service_protos => [emqx_exproto_pb],
-        services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}
-    },
-    listen_opts => #{
-        port => 9001,
-        socket_options => []
-    },
-    pool_opts => #{size => 8},
-    transport_opts => #{ssl => false}
-}).
-
 -define(CLIENT, emqx_exproto_v_1_connection_adapter_client).
 
 -define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})).
@@ -65,6 +56,7 @@
 -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})).
 -define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})).
 -define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})).
+-define(raw_publish(Req), ?CLIENT:raw_publish(Req, #{channel => ct_test_channel})).
 -define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})).
 -define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})).
 
@@ -77,6 +69,7 @@
 -define(TYPE_UNSUBSCRIBE, 7).
 -define(TYPE_UNSUBACK, 8).
 -define(TYPE_DISCONNECT, 9).
+-define(TYPE_RAW_PUBLISH, 10).
 
 -define(loop_recv_and_reply_empty_success(Stream),
     ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)
@@ -104,9 +97,9 @@ end).
 %% APIs
 %%--------------------------------------------------------------------
 
-start() ->
+start(Scheme) ->
     application:ensure_all_started(grpc),
-    [ensure_channel(), start_server()].
+    [ensure_channel(), start_server(Scheme)].
 
 ensure_channel() ->
     case grpc_client_sup:create_channel_pool(ct_test_channel, "http://127.0.0.1:9100", #{}) of
@@ -114,12 +107,40 @@ ensure_channel() ->
         {ok, Pid} -> {ok, Pid}
     end.
 
-start_server() ->
+start_server(Scheme) when Scheme == http; Scheme == https ->
     Services = #{
         protos => [emqx_exproto_pb],
-        services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}
+        services => #{
+            'emqx.exproto.v1.ConnectionHandler' => ?MODULE,
+            'emqx.exproto.v1.ConnectionUnaryHandler' => emqx_exproto_unary_echo_svr
+        }
     },
-    Options = [],
+    CertsDir = filename:join(
+        [
+            emqx_common_test_helpers:proj_root(),
+            "apps",
+            "emqx",
+            "etc",
+            "certs"
+        ]
+    ),
+
+    Options =
+        case Scheme of
+            https ->
+                CAfile = filename:join([CertsDir, "cacert.pem"]),
+                Keyfile = filename:join([CertsDir, "key.pem"]),
+                Certfile = filename:join([CertsDir, "cert.pem"]),
+                [
+                    {ssl_options, [
+                        {cacertfile, CAfile},
+                        {certfile, Certfile},
+                        {keyfile, Keyfile}
+                    ]}
+                ];
+            _ ->
+                []
+        end,
     grpc:start_server(?MODULE, 9001, Services, Options).
 
 stop([_ChannPid, _SvrPid]) ->
@@ -249,6 +270,17 @@ handle_in(Conn, ?TYPE_PUBLISH, #{
         _ ->
             handle_out(Conn, ?TYPE_PUBACK, 1)
     end;
+handle_in(Conn, ?TYPE_RAW_PUBLISH, #{
+    <<"topic">> := Topic,
+    <<"qos">> := Qos,
+    <<"payload">> := Payload
+}) ->
+    case ?raw_publish(#{topic => Topic, qos => Qos, payload => Payload}) of
+        {ok, #{code := 'SUCCESS'}, _} ->
+            handle_out(Conn, ?TYPE_PUBACK, 0);
+        _ ->
+            handle_out(Conn, ?TYPE_PUBACK, 1)
+    end;
 handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) ->
     case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of
         {ok, #{code := 'SUCCESS'}, _} ->
@@ -275,7 +307,9 @@ handle_out(Conn, ?TYPE_SUBACK, Code) ->
 handle_out(Conn, ?TYPE_UNSUBACK, Code) ->
     ?send(#{conn => Conn, bytes => frame_unsuback(Code)});
 handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) ->
-    ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}).
+    ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)});
+handle_out(Conn, ?TYPE_RAW_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) ->
+    ?send(#{conn => Conn, bytes => frame_raw_publish(Topic, Qos, Payload)}).
 
 handle_out(Conn, ?TYPE_DISCONNECT) ->
     ?send(#{conn => Conn, bytes => frame_disconnect()}).
@@ -300,6 +334,14 @@ frame_publish(Topic, Qos, Payload) ->
         payload => Payload
     }).
 
+frame_raw_publish(Topic, Qos, Payload) ->
+    emqx_utils_json:encode(#{
+        type => ?TYPE_RAW_PUBLISH,
+        topic => Topic,
+        qos => Qos,
+        payload => Payload
+    }).
+
 frame_puback(Code) ->
     emqx_utils_json:encode(#{type => ?TYPE_PUBACK, code => Code}).
 

+ 98 - 0
apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl

@@ -0,0 +1,98 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exproto_unary_echo_svr).
+
+-behavior(emqx_exproto_v_1_connection_unary_handler_bhvr).
+
+-import(
+    emqx_exproto_echo_svr,
+    [
+        handle_in/3,
+        handle_out/2,
+        handle_out/3
+    ]
+).
+
+-export([
+    on_socket_created/2,
+    on_received_bytes/2,
+    on_socket_closed/2,
+    on_timer_timeout/2,
+    on_received_messages/2
+]).
+
+-define(LOG(Fmt, Args), ct:pal(Fmt, Args)).
+
+-define(CLIENT, emqx_exproto_v_1_connection_adapter_client).
+
+-define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})).
+
+-define(TYPE_CONNECT, 1).
+-define(TYPE_CONNACK, 2).
+-define(TYPE_PUBLISH, 3).
+-define(TYPE_PUBACK, 4).
+-define(TYPE_SUBSCRIBE, 5).
+-define(TYPE_SUBACK, 6).
+-define(TYPE_UNSUBSCRIBE, 7).
+-define(TYPE_UNSUBACK, 8).
+-define(TYPE_DISCONNECT, 9).
+-define(TYPE_RAW_PUBLISH, 10).
+
+%%--------------------------------------------------------------------
+%% callbacks
+%%--------------------------------------------------------------------
+
+-spec on_socket_created(emqx_exproto_pb:socket_created_request(), grpc:metadata()) ->
+    {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
+    | {error, grpc_stream:error_response()}.
+on_socket_created(_Req, _Md) ->
+    {ok, #{}, _Md}.
+
+-spec on_socket_closed(emqx_exproto_pb:socket_closed_request(), grpc:metadata()) ->
+    {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
+    | {error, grpc_stream:error_response()}.
+on_socket_closed(_Req, _Md) ->
+    {ok, #{}, _Md}.
+
+-spec on_received_bytes(emqx_exproto_pb:received_bytes_request(), grpc:metadata()) ->
+    {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
+    | {error, grpc_stream:error_response()}.
+on_received_bytes(#{conn := Conn, bytes := Bytes}, _Md) ->
+    #{<<"type">> := Type} = Params = emqx_utils_json:decode(Bytes, [return_maps]),
+    _ = handle_in(Conn, Type, Params),
+    {ok, #{}, _Md}.
+
+-spec on_timer_timeout(emqx_exproto_pb:timer_timeout_request(), grpc:metadata()) ->
+    {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
+    | {error, grpc_stream:error_response()}.
+on_timer_timeout(#{conn := Conn, type := 'KEEPALIVE'}, _Md) ->
+    ?LOG("Close this connection ~p due to keepalive timeout", [Conn]),
+    handle_out(Conn, ?TYPE_DISCONNECT),
+    ?close(#{conn => Conn}),
+    {ok, #{}, _Md}.
+
+-spec on_received_messages(emqx_exproto_pb:received_messages_request(), grpc:metadata()) ->
+    {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
+    | {error, grpc_stream:error_response()}.
+on_received_messages(#{conn := Conn, messages := Messages}, _Md) ->
+    lists:foreach(
+        fun(Message) ->
+            handle_out(Conn, ?TYPE_PUBLISH, Message)
+        end,
+        Messages
+    ),
+    {ok, #{}, _Md}.

+ 1 - 0
changes/ce/feat-10598.en.md

@@ -0,0 +1 @@
+Provide a callback method of Unary type in ExProto to avoid possible message disorder issues.

+ 1 - 1
mix.exs

@@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do
       {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.15.2", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
-      {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
+      {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true},
       {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},

+ 1 - 1
rebar.config

@@ -64,7 +64,7 @@
     , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
-    , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
+    , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
     , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}

+ 9 - 0
rel/i18n/emqx_exproto_schema.hocon

@@ -6,6 +6,15 @@ exproto.desc:
 exproto_grpc_handler_address.desc:
 """gRPC server address."""
 
+exproto_grpc_handler_service_name.desc:
+"""The service name to handle the connection events.
+In the initial version, we expected to use streams to improve the efficiency
+of requests in `ConnectionHandler`. But unfortunately, events between different
+streams are out of order. It causes the `OnSocketCreated` event to may arrive
+later than `OnReceivedBytes`.
+So we added the `ConnectionUnaryHandler` service since v5.0.25 and forced
+the use of Unary in it to avoid ordering problems."""
+
 exproto_grpc_handler_ssl.desc:
 """SSL configuration for the gRPC client."""