Просмотр исходного кода

feat(exproto): use client streaming APIs for handler

- Use the gRPC client streaming APIs to improve the
  ConnectionHandler server performance.
- Change the 'conn' field type to term binary
JianBo He 5 лет назад
Родитель
Сommit
d360e7ead1

+ 2 - 2
apps/emqx_exhook/rebar.config

@@ -1,11 +1,11 @@
 %%-*- mode: erlang -*-
 {plugins,
  [rebar3_proper,
-  {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.9.1"}}}
+  {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}}
 ]}.
 
 {deps,
- [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.5.0"}}}
+ [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}}
 ]}.
 
 {grpc,

+ 5 - 5
apps/emqx_exproto/priv/protos/exproto.proto

@@ -47,17 +47,17 @@ service ConnectionHandler {
 
   // -- socket layer
 
-  rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {};
+  rpc OnSocketCreated(stream SocketCreatedRequest) returns (EmptySuccess) {};
 
-  rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {};
+  rpc OnSocketClosed(stream SocketClosedRequest) returns (EmptySuccess) {};
 
-  rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {};
+  rpc OnReceivedBytes(stream ReceivedBytesRequest) returns (EmptySuccess) {};
 
   // -- pub/sub layer
 
-  rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {};
+  rpc OnTimerTimeout(stream TimerTimeoutRequest) returns (EmptySuccess) {};
 
-  rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {};
+  rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {};
 }
 
 message EmptySuccess { }

+ 2 - 2
apps/emqx_exproto/rebar.config

@@ -9,11 +9,11 @@
             {parse_transform}]}.
 {plugins,
  [rebar3_proper,
-  {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.9.1"}}}
+  {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}}
 ]}.
 
 {deps,
- [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.5.0"}}}
+ [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}}
  ]}.
 
 {grpc,

+ 5 - 4
apps/emqx_exproto/src/emqx_exproto_channel.erl

@@ -364,7 +364,8 @@ handle_info({sock_closed, Reason},
     case queue:len(Queue) =:= 0
          andalso Inflight =:= undefined of
         true ->
-            {shutdown, {sock_closed, Reason}, Channel};
+            Channel1 = ensure_disconnected({sock_closed, Reason}, Channel),
+            {shutdown, {sock_closed, Reason}, Channel1};
         _ ->
             %% delayed close process for flushing all callback funcs to gRPC server
             Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}},
@@ -372,9 +373,9 @@ handle_info({sock_closed, Reason},
             {ok, ensure_disconnected({sock_closed, Reason}, Channel2)}
     end;
 
-handle_info({hreply, on_socket_created, {ok, _}}, Channel) ->
+handle_info({hreply, on_socket_created, ok}, Channel) ->
     dispatch_or_close_process(Channel#channel{inflight = undefined});
-handle_info({hreply, FunName, {ok, _}}, Channel)
+handle_info({hreply, FunName, ok}, Channel)
   when FunName == on_socket_closed;
        FunName == on_received_bytes;
        FunName == on_received_messages;
@@ -525,7 +526,7 @@ interval(alive_timer, #channel{keepalive = Keepalive}) ->
 %%--------------------------------------------------------------------
 
 wrap(Req) ->
-     Req#{conn => pid_to_list(self())}.
+     Req#{conn => base64:encode(term_to_binary(self()))}.
 
 dispatch_or_close_process(Channel = #channel{
                                        rqueue = Queue,

+ 40 - 22
apps/emqx_exproto/src/emqx_exproto_gcli.erl

@@ -37,6 +37,12 @@
         , code_change/3
         ]).
 
+-record(state, {
+          pool,
+          id,
+          streams
+         }).
+
 -define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client).
 
 %%--------------------------------------------------------------------
@@ -68,32 +74,34 @@ pick(Conn) ->
 
 init([Pool, Id]) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
-    {ok, #{pool => Pool, id => Id}}.
+    {ok, #state{pool = Pool, id = Id, streams = #{}}}.
 
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
-handle_cast({rpc, Fun, Req, Options, From}, State) ->
-    try
-        case apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of
-            {ok, Resp, _Metadata} ->
-                ?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]),
-                reply(From, Fun, {ok, Resp});
-            {error, {Code, Msg}, _Metadata} ->
-                ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
-                        [?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]),
-                reply(From, Fun, {error, {Code, Msg}});
-            {error, Reason} ->
-                ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
-                        [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]),
-                reply(From, Fun, {error, Reason})
-        end
-    catch _ : Rsn : Stk ->
-        ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
-             [?CONN_ADAPTER_MOD, Fun, Req, Options, Rsn, Stk]),
-        reply(From, Fun, {error, Rsn})
-    end,
-    {noreply, State}.
+handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) ->
+    case ensure_stream_opened(Fun, Options, Streams) of
+        {error, Reason} ->
+            ?LOG(error, "CALL ~0p:~0p(~0p) failed, reason: ~0p",
+                    [?CONN_ADAPTER_MOD, Fun, Options, Reason]),
+            reply(From, Fun, {error, Reason}),
+            {noreply, State#state{streams = Streams#{Fun => undefined}}};
+        {ok, Stream} ->
+            case catch grpc_client:send(Stream, Req) of
+                ok ->
+                    ?LOG(debug, "Send to ~p method successfully, request: ~0p", [Fun, Req]),
+                    reply(From, Fun, ok),
+                    {noreply, State#state{streams = Streams#{Fun => Stream}}};
+                {'EXIT', {timeout, _Stk}} ->
+                    ?LOG(error, "Send to ~p method timeout, request: ~0p", [Fun, Req]),
+                    reply(From, Fun, {error, timeout}),
+                    {noreply, State#state{streams = Streams#{Fun => Stream}}};
+                {'EXIT', {Reason1, _Stk}} ->
+                    ?LOG(error, "Send to ~p method failure, request: ~0p, stacktrace: ~0p", [Fun, Req, _Stk]),
+                    reply(From, Fun, {error, Reason1}),
+                    {noreply, State#state{streams = Streams#{Fun => undefined}}}
+            end
+    end.
 
 handle_info(_Info, State) ->
     {noreply, State}.
@@ -111,3 +119,13 @@ code_change(_OldVsn, State, _Extra) ->
 reply(Pid, Fun, Result) ->
     Pid ! {hreply, Fun, Result},
     ok.
+
+ensure_stream_opened(Fun, Options, Streams) ->
+    case maps:get(Fun, Streams, undefined) of
+        undefined ->
+            case apply(?CONN_ADAPTER_MOD, Fun, [Options]) of
+                {ok, Stream} -> {ok, Stream};
+                {error, Reason} -> {error, Reason}
+            end;
+        Stream -> {ok, Stream}
+    end.

+ 2 - 2
apps/emqx_exproto/src/emqx_exproto_gsvr.erl

@@ -115,10 +115,10 @@ unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) ->
 %%--------------------------------------------------------------------
 
 to_pid(ConnStr) ->
-    list_to_pid(binary_to_list(ConnStr)).
+    binary_to_term(base64:decode(ConnStr)).
 
 call(ConnStr, Req) ->
-    case catch  to_pid(ConnStr) of
+    case catch to_pid(ConnStr) of
         {'EXIT', {badarg, _}} ->
             {error, ?RESP_PARAMS_TYPE_ERROR,
                     <<"The conn type error">>};

+ 0 - 1
apps/emqx_exproto/test/emqx_exproto_SUITE.erl

@@ -239,7 +239,6 @@ t_hook_connected_disconnected(Cfg) ->
     emqx:hook('client.connected', HookFun1),
     emqx:hook('client.disconnected', HookFun2),
 
-
     send(Sock, ConnBin),
     {ok, ConnAckBin} = recv(Sock, 5000),
 

+ 85 - 56
apps/emqx_exproto/test/emqx_exproto_echo_svr.erl

@@ -40,6 +40,8 @@
         , on_received_messages/2
         ]).
 
+-define(LOG(Fmt, Args), io:format(standard_error, Fmt, Args)).
+
 -define(HTTP, #{grpc_opts => #{service_protos => [emqx_exproto_pb],
                                services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}},
                 listen_opts => #{port => 9001,
@@ -48,23 +50,44 @@
                 transport_opts => #{ssl => false}}).
 
 -define(CLIENT, emqx_exproto_v_1_connection_adapter_client).
--define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})).
--define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})).
+
+-define(send(Req),         ?CLIENT:send(Req, #{channel => ct_test_channel})).
+-define(close(Req),        ?CLIENT:close(Req, #{channel => ct_test_channel})).
 -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})).
--define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})).
--define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})).
--define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})).
--define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})).
-
--define(TYPE_CONNECT, 1).
--define(TYPE_CONNACK, 2).
--define(TYPE_PUBLISH, 3).
--define(TYPE_PUBACK, 4).
--define(TYPE_SUBSCRIBE, 5).
--define(TYPE_SUBACK, 6).
+-define(start_timer(Req),  ?CLIENT:start_timer(Req, #{channel => ct_test_channel})).
+-define(publish(Req),      ?CLIENT:publish(Req, #{channel => ct_test_channel})).
+-define(subscribe(Req),    ?CLIENT:subscribe(Req, #{channel => ct_test_channel})).
+-define(unsubscribe(Req),  ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})).
+
+-define(TYPE_CONNECT,     1).
+-define(TYPE_CONNACK,     2).
+-define(TYPE_PUBLISH,     3).
+-define(TYPE_PUBACK,      4).
+-define(TYPE_SUBSCRIBE,   5).
+-define(TYPE_SUBACK,      6).
 -define(TYPE_UNSUBSCRIBE, 7).
--define(TYPE_UNSUBACK, 8).
--define(TYPE_DISCONNECT, 9).
+-define(TYPE_UNSUBACK,    8).
+-define(TYPE_DISCONNECT,  9).
+
+-define(loop_recv_and_reply_empty_success(Stream),
+        ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)).
+
+-define(loop_recv_and_reply_empty_success(Stream, Fun),
+        begin
+            LoopRecv = fun _Lp(_St) ->
+                case grpc_stream:recv(_St) of
+                    {more, _Reqs, _NSt} ->
+                        ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]),
+                        Fun(_Reqs), _Lp(_NSt);
+                    {eos, _Reqs, _NSt} ->
+                        ?LOG("~p: ~p~n", [?FUNCTION_NAME, _Reqs]),
+                        Fun(_Reqs), _NSt
+                end
+            end,
+            NStream  = LoopRecv(Stream),
+            grpc_stream:reply(NStream, #{}),
+            {ok, NStream}
+        end).
 
 %%--------------------------------------------------------------------
 %% APIs
@@ -92,47 +115,53 @@ stop([_ChannPid, _SvrPid]) ->
 %% Protocol Adapter callbacks
 %%--------------------------------------------------------------------
 
--spec on_socket_created(emqx_exproto_pb:socket_created_request(), grpc:metadata())
-    -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
-     | {error, grpc_cowboy_h:error_response()}.
-on_socket_created(Req, Md) ->
-    io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]),
-    {ok, #{}, Md}.
-
--spec on_socket_closed(emqx_exproto_pb:socket_closed_request(), grpc:metadata())
-    -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
-     | {error, grpc_cowboy_h:error_response()}.
-on_socket_closed(Req, Md) ->
-    io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]),
-    {ok, #{}, Md}.
-
--spec on_received_bytes(emqx_exproto_pb:received_bytes_request(), grpc:metadata())
-    -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
-     | {error, grpc_cowboy_h:error_response()}.
-on_received_bytes(Req = #{conn := Conn, bytes := Bytes}, Md) ->
-    io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]),
-    #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]),
-    _ = handle_in(Conn, Type, Params),
-    {ok, #{}, Md}.
-
--spec on_timer_timeout(emqx_exproto_pb:timer_timeout_request(), grpc:metadata())
-    -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
-     | {error, grpc_cowboy_h:error_response()}.
-on_timer_timeout(Req = #{conn := Conn, type := 'KEEPALIVE'}, Md) ->
-    io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]),
-    handle_out(Conn, ?TYPE_DISCONNECT),
-    ?close(#{conn => Conn}),
-    {ok, #{}, Md}.
-
--spec on_received_messages(emqx_exproto_pb:received_messages_request(), grpc:metadata())
-    -> {ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
-     | {error, grpc_cowboy_h:error_response()}.
-on_received_messages(Req = #{conn := Conn, messages := Messages}, Md) ->
-    io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]),
-    lists:foreach(fun(Message) ->
-        handle_out(Conn, ?TYPE_PUBLISH, Message)
-    end, Messages),
-    {ok, #{}, Md}.
+-spec on_socket_created(grpc_stream:stream(), grpc:metadata())
+    -> {ok, grpc_stream:stream()}.
+on_socket_created(Stream, _Md) ->
+    ?loop_recv_and_reply_empty_success(Stream).
+
+-spec on_socket_closed(grpc_stream:stream(), grpc:metadata())
+    -> {ok, grpc_stream:stream()}.
+on_socket_closed(Stream, _Md) ->
+    ?loop_recv_and_reply_empty_success(Stream).
+
+-spec on_received_bytes(grpc_stream:stream(), grpc:metadata())
+    -> {ok, grpc_stream:stream()}.
+on_received_bytes(Stream, _Md) ->
+    ?loop_recv_and_reply_empty_success(Stream,
+      fun(Reqs) ->
+        lists:foreach(
+          fun(#{conn := Conn, bytes := Bytes}) ->
+            #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]),
+            _ = handle_in(Conn, Type, Params)
+          end, Reqs)
+      end).
+
+-spec on_timer_timeout(grpc_stream:stream(), grpc:metadata())
+    -> {ok, grpc_stream:stream()}.
+on_timer_timeout(Stream, _Md) ->
+    ?loop_recv_and_reply_empty_success(Stream,
+      fun(Reqs) ->
+        lists:foreach(
+          fun(#{conn := Conn, type := 'KEEPALIVE'}) ->
+            ?LOG("Close this connection ~p due to keepalive timeout", [Conn]),
+            handle_out(Conn, ?TYPE_DISCONNECT),
+            ?close(#{conn => Conn})
+          end, Reqs)
+      end).
+
+-spec on_received_messages(grpc_stream:stream(), grpc:metadata())
+    -> {ok, grpc_stream:stream()}.
+on_received_messages(Stream, _Md) ->
+    ?loop_recv_and_reply_empty_success(Stream,
+      fun(Reqs) ->
+        lists:foreach(
+          fun(#{conn := Conn, messages := Messages}) ->
+            lists:foreach(fun(Message) ->
+                handle_out(Conn, ?TYPE_PUBLISH, Message)
+            end, Messages)
+          end, Reqs)
+      end).
 
 %%--------------------------------------------------------------------
 %% The Protocol Example: