|
|
@@ -289,11 +289,13 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S
|
|
|
handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
Channel = #channel{clientinfo = ClientInfo}) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
|
- ok -> TopicFilters1 = enrich_subid(Properties, parse_topic_filters(TopicFilters)),
|
|
|
- TopicFilters2 = emqx_hooks:run_fold('client.subscribe',
|
|
|
- [ClientInfo, Properties],
|
|
|
- TopicFilters1),
|
|
|
- {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel),
|
|
|
+ ok -> TopicFilters1 = parse_topic_filters(TopicFilters),
|
|
|
+ TopicFilters2 = enrich_subid(Properties, TopicFilters1),
|
|
|
+ TopicFilters3 = run_hooks('client.subscribe',
|
|
|
+ [ClientInfo, Properties],
|
|
|
+ TopicFilters2
|
|
|
+ ),
|
|
|
+ {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel),
|
|
|
handle_out(suback, {PacketId, ReasonCodes}, NChannel);
|
|
|
{error, ReasonCode} ->
|
|
|
handle_out(disconnect, ReasonCode, Channel)
|
|
|
@@ -302,9 +304,10 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
|
|
|
Channel = #channel{clientinfo = ClientInfo}) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
|
- ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
|
|
- [ClientInfo, Properties],
|
|
|
- parse_topic_filters(TopicFilters)),
|
|
|
+ ok -> TopicFilters1 = run_hooks('client.unsubscribe',
|
|
|
+ [ClientInfo, Properties],
|
|
|
+ parse_topic_filters(TopicFilters)
|
|
|
+ ),
|
|
|
{ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
|
|
handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
|
|
|
{error, ReasonCode} ->
|
|
|
@@ -550,15 +553,14 @@ not_nacked({deliver, _Topic, Msg}) ->
|
|
|
| {ok, replies(), channel()}
|
|
|
| {shutdown, Reason :: term(), channel()}
|
|
|
| {shutdown, Reason :: term(), replies(), channel()}).
|
|
|
-handle_out(connack, {RC = ?RC_SUCCESS, SP, ConnPkt},
|
|
|
- Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
+handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
AckProps = run_fold([fun enrich_connack_caps/2,
|
|
|
fun enrich_server_keepalive/2,
|
|
|
fun enrich_assigned_clientid/2
|
|
|
], #{}, Channel),
|
|
|
- AckPacket = ?CONNACK_PACKET(RC, SP, AckProps),
|
|
|
- AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket),
|
|
|
- return_connack(AckPacket1,
|
|
|
+ AckPacket = run_hooks('client.connack', [ConnInfo],
|
|
|
+ ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps)),
|
|
|
+ return_connack(AckPacket,
|
|
|
ensure_keepalive(AckProps,
|
|
|
ensure_connected(ConnPkt, Channel)));
|
|
|
|
|
|
@@ -567,7 +569,7 @@ handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
|
|
|
?MQTT_PROTO_V5 -> ReasonCode;
|
|
|
_ -> emqx_reason_codes:compat(connack, ReasonCode)
|
|
|
end),
|
|
|
- AckPacket1 = emqx_hooks:run_fold('client.connack', [ConnInfo], AckPacket),
|
|
|
+ AckPacket1 = run_hooks('client.connack', [ConnInfo], AckPacket),
|
|
|
shutdown(emqx_reason_codes:name(ReasonCode), AckPacket1, Channel);
|
|
|
|
|
|
%% Optimize?
|
|
|
@@ -732,16 +734,18 @@ handle_call(Req, Channel) ->
|
|
|
-spec(handle_info(Info :: term(), channel())
|
|
|
-> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}).
|
|
|
handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
|
|
- TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
|
|
|
- [ClientInfo, #{'Internal' => true}],
|
|
|
- parse_topic_filters(TopicFilters)),
|
|
|
+ TopicFilters1 = run_hooks('client.subscribe',
|
|
|
+ [ClientInfo, #{'Internal' => true}],
|
|
|
+ parse_topic_filters(TopicFilters)
|
|
|
+ ),
|
|
|
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
|
|
|
- TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
|
|
|
- [ClientInfo, #{'Internal' => true}],
|
|
|
- parse_topic_filters(TopicFilters)),
|
|
|
+ TopicFilters1 = run_hooks('client.unsubscribe',
|
|
|
+ [ClientInfo, #{'Internal' => true}],
|
|
|
+ parse_topic_filters(TopicFilters)
|
|
|
+ ),
|
|
|
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
|
|
|
{ok, NChannel};
|
|
|
|
|
|
@@ -940,7 +944,7 @@ receive_maximum(#{zone := Zone}, ConnProps) ->
|
|
|
%% Run Connect Hooks
|
|
|
|
|
|
run_conn_hooks(ConnPkt, Channel = #channel{conninfo = ConnInfo}) ->
|
|
|
- case emqx_hooks:run_fold('client.connect', [ConnInfo], ConnPkt) of
|
|
|
+ case run_hooks('client.connect', [ConnInfo], ConnPkt) of
|
|
|
Error = {error, _Reason} -> Error;
|
|
|
NConnPkt -> {ok, NConnPkt, Channel}
|
|
|
end.
|
|
|
@@ -1183,7 +1187,7 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
|
|
ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
|
|
|
clientinfo = ClientInfo}) ->
|
|
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
|
|
- ok = emqx_hooks:run('client.connected', [ClientInfo, NConnInfo]),
|
|
|
+ ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
|
|
Channel#channel{conninfo = NConnInfo,
|
|
|
conn_state = connected,
|
|
|
will_msg = emqx_packet:will_msg(ConnPkt),
|
|
|
@@ -1262,7 +1266,7 @@ parse_topic_filters(TopicFilters) ->
|
|
|
ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
|
|
clientinfo = ClientInfo}) ->
|
|
|
NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(second)},
|
|
|
- ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
|
|
+ ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
|
|
|
Channel#channel{conninfo = NConnInfo, conn_state = disconnected}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -1292,6 +1296,13 @@ disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode).
|
|
|
%% Helper functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+-compile({inline, [run_hooks/2, run_hooks/3]}).
|
|
|
+run_hooks(Name, Args) ->
|
|
|
+ ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
|
|
|
+
|
|
|
+run_hooks(Name, Args, Acc) ->
|
|
|
+ ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
|
|
|
+
|
|
|
-compile({inline, [find_alias/2, save_alias/3]}).
|
|
|
|
|
|
find_alias(_AliasId, undefined) -> false;
|