Explorar el Código

feat(exproto): add raw_publish function

JianBo He hace 2 años
padre
commit
837dfeb46f

+ 11 - 0
apps/emqx_gateway_exproto/priv/protos/exproto.proto

@@ -41,6 +41,8 @@ service ConnectionAdapter {
   rpc Subscribe(SubscribeRequest) returns (CodeResponse) {};
 
   rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {};
+
+  rpc RawPublish(RawPublishRequest) returns (CodeResponse) {};
 }
 
 // Deprecated service.
@@ -165,6 +167,15 @@ message PublishRequest {
   bytes payload = 4;
 }
 
+message RawPublishRequest {
+
+  string topic = 1;
+
+  uint32 qos = 2;
+
+  bytes payload = 3;
+}
+
 message SubscribeRequest {
 
   string conn = 1;

+ 23 - 9
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -297,7 +297,9 @@ handle_timeout(
             NChannel = remove_timer_ref(alive_timer, Channel),
             %% close connection if keepalive timeout
             Replies = [{event, disconnected}, {close, keepalive_timeout}],
-            NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{closed_reason = keepalive_timeout}),
+            NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{
+                closed_reason = keepalive_timeout
+            }),
             {ok, Replies, NChannel1}
     end;
 handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
@@ -474,10 +476,21 @@ handle_call(kick, _From, Channel) ->
     {reply, ok, [{event, disconnected}, {close, kicked}], Channel};
 handle_call(discard, _From, Channel) ->
     {shutdown, discarded, ok, Channel};
-handle_call(Req, _From, Channel) ->
+handle_call(
+    Req,
+    _From,
+    Channel = #channel{
+        conn_state = ConnState,
+        clientinfo = ClientInfo,
+        closed_reason = ClosedReason
+    }
+) ->
     ?SLOG(warning, #{
         msg => "unexpected_call",
-        call => Req
+        call => Req,
+        conn_state => ConnState,
+        clientid => maps:get(clientid, ClientInfo, undefined),
+        closed_reason => ClosedReason
     }),
     {reply, {error, unexpected_call}, Channel}.
 
@@ -505,12 +518,13 @@ handle_info(
             {shutdown, Reason, Channel1};
         _ ->
             %% delayed close process for flushing all callback funcs to gRPC server
-            Channel1 = case ClosedReason of
-                           undefined ->
-                               Channel#channel{closed_reason = Reason};
-                           _ ->
-                               Channel
-                       end,
+            Channel1 =
+                case ClosedReason of
+                    undefined ->
+                        Channel#channel{closed_reason = Reason};
+                    _ ->
+                        Channel
+                end,
             Channel2 = ensure_timer(force_timer, Channel1),
             {ok, ensure_disconnected(Reason, Channel2)}
     end;

+ 14 - 0
apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl

@@ -33,6 +33,7 @@
     authenticate/2,
     start_timer/2,
     publish/2,
+    raw_publish/2,
     subscribe/2,
     unsubscribe/2
 ]).
@@ -129,6 +130,19 @@ publish(Req, Md) ->
     }),
     {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
 
+-spec raw_publish(emqx_exproto_pb:raw_publish_request(), grpc:metadata()) ->
+    {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
+    | {error, grpc_stream:error_response()}.
+raw_publish(Req = #{topic := Topic, qos := Qos, payload := Payload}, Md) ->
+    ?SLOG(debug, #{
+        msg => "recv_grpc_function_call",
+        function => ?FUNCTION_NAME,
+        request => Req
+    }),
+    Msg = emqx_message:make(exproto, Qos, Topic, Payload),
+    _ = emqx_broker:safe_publish(Msg),
+    {ok, response(ok), Md}.
+
 -spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata()) ->
     {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
     | {error, grpc_cowboy_h:error_response()}.

+ 36 - 0
apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl

@@ -31,6 +31,7 @@
         frame_connect/2,
         frame_connack/1,
         frame_publish/3,
+        frame_raw_publish/3,
         frame_puback/1,
         frame_subscribe/2,
         frame_suback/1,
@@ -86,6 +87,7 @@ groups() ->
     MainCases = [
         t_keepalive_timeout,
         t_mountpoint_echo,
+        t_raw_publish,
         t_auth_deny,
         t_acl_deny,
         t_hook_connected_disconnected,
@@ -229,6 +231,40 @@ t_mountpoint_echo(Cfg) ->
     end,
     close(Sock).
 
+t_raw_publish(Cfg) ->
+    SockType = proplists:get_value(listener_type, Cfg),
+    Sock = open(SockType),
+
+    Client = #{
+        proto_name => <<"demo">>,
+        proto_ver => <<"v0.1">>,
+        clientid => <<"test_client_1">>,
+        mountpoint => <<"ct/">>
+    },
+    Password = <<"123456">>,
+
+    ConnBin = frame_connect(Client, Password),
+    ConnAckBin = frame_connack(0),
+
+    send(Sock, ConnBin),
+    {ok, ConnAckBin} = recv(Sock, 5000),
+
+    PubBin2 = frame_raw_publish(<<"t/up">>, 0, <<"echo">>),
+    PubAckBin = frame_puback(0),
+
+    %% mountpoint is not used in raw publish
+    emqx:subscribe(<<"t/up">>),
+
+    send(Sock, PubBin2),
+    {ok, PubAckBin} = recv(Sock, 5000),
+
+    receive
+        {deliver, _, _} -> ok
+    after 1000 ->
+        error(echo_not_running)
+    end,
+    close(Sock).
+
 t_auth_deny(Cfg) ->
     SockType = proplists:get_value(listener_type, Cfg),
     Sock = open(SockType),

+ 25 - 1
apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl

@@ -27,6 +27,7 @@
     frame_connect/2,
     frame_connack/1,
     frame_publish/3,
+    frame_raw_publish/3,
     frame_puback/1,
     frame_subscribe/2,
     frame_suback/1,
@@ -55,6 +56,7 @@
 -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})).
 -define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})).
 -define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})).
+-define(raw_publish(Req), ?CLIENT:raw_publish(Req, #{channel => ct_test_channel})).
 -define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})).
 -define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})).
 
@@ -67,6 +69,7 @@
 -define(TYPE_UNSUBSCRIBE, 7).
 -define(TYPE_UNSUBACK, 8).
 -define(TYPE_DISCONNECT, 9).
+-define(TYPE_RAW_PUBLISH, 10).
 
 -define(loop_recv_and_reply_empty_success(Stream),
     ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end)
@@ -267,6 +270,17 @@ handle_in(Conn, ?TYPE_PUBLISH, #{
         _ ->
             handle_out(Conn, ?TYPE_PUBACK, 1)
     end;
+handle_in(Conn, ?TYPE_RAW_PUBLISH, #{
+    <<"topic">> := Topic,
+    <<"qos">> := Qos,
+    <<"payload">> := Payload
+}) ->
+    case ?raw_publish(#{topic => Topic, qos => Qos, payload => Payload}) of
+        {ok, #{code := 'SUCCESS'}, _} ->
+            handle_out(Conn, ?TYPE_PUBACK, 0);
+        _ ->
+            handle_out(Conn, ?TYPE_PUBACK, 1)
+    end;
 handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) ->
     case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of
         {ok, #{code := 'SUCCESS'}, _} ->
@@ -293,7 +307,9 @@ handle_out(Conn, ?TYPE_SUBACK, Code) ->
 handle_out(Conn, ?TYPE_UNSUBACK, Code) ->
     ?send(#{conn => Conn, bytes => frame_unsuback(Code)});
 handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) ->
-    ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}).
+    ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)});
+handle_out(Conn, ?TYPE_RAW_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) ->
+    ?send(#{conn => Conn, bytes => frame_raw_publish(Topic, Qos, Payload)}).
 
 handle_out(Conn, ?TYPE_DISCONNECT) ->
     ?send(#{conn => Conn, bytes => frame_disconnect()}).
@@ -318,6 +334,14 @@ frame_publish(Topic, Qos, Payload) ->
         payload => Payload
     }).
 
+frame_raw_publish(Topic, Qos, Payload) ->
+    emqx_utils_json:encode(#{
+        type => ?TYPE_RAW_PUBLISH,
+        topic => Topic,
+        qos => Qos,
+        payload => Payload
+    }).
+
 frame_puback(Code) ->
     emqx_utils_json:encode(#{type => ?TYPE_PUBACK, code => Code}).
 

+ 1 - 0
apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl

@@ -50,6 +50,7 @@
 -define(TYPE_UNSUBSCRIBE, 7).
 -define(TYPE_UNSUBACK, 8).
 -define(TYPE_DISCONNECT, 9).
+-define(TYPE_RAW_PUBLISH, 10).
 
 %%--------------------------------------------------------------------
 %% callbacks