emqx_shared_sub.erl 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_shared_sub).
  17. -behaviour(gen_server).
  18. -include("emqx.hrl").
  19. -include("emqx_mqtt.hrl").
  20. -include("logger.hrl").
  21. -include("types.hrl").
  22. %% Mnesia bootstrap
  23. -export([mnesia/1]).
  24. -boot_mnesia({mnesia, [boot]}).
  25. %% APIs
  26. -export([start_link/0]).
  27. -export([
  28. subscribe/3,
  29. unsubscribe/3
  30. ]).
  31. -export([
  32. dispatch/3,
  33. dispatch/4
  34. ]).
  35. -export([
  36. maybe_ack/1,
  37. maybe_nack_dropped/1,
  38. nack_no_connection/1,
  39. is_ack_required/1,
  40. get_group/1
  41. ]).
  42. %% for testing
  43. -ifdef(TEST).
  44. -export([
  45. subscribers/2,
  46. strategy/1
  47. ]).
  48. -endif.
  49. %% gen_server callbacks
  50. -export([
  51. init/1,
  52. handle_call/3,
  53. handle_cast/2,
  54. handle_info/2,
  55. terminate/2,
  56. code_change/3
  57. ]).
  58. %% Internal exports (RPC)
  59. -export([
  60. init_monitors/0
  61. ]).
  62. -export_type([strategy/0]).
  63. -type strategy() ::
  64. random
  65. | round_robin
  66. | round_robin_per_group
  67. | sticky
  68. | local
  69. %% same as hash_clientid, backward compatible
  70. | hash
  71. | hash_clientid
  72. | hash_topic.
  73. -define(SERVER, ?MODULE).
  74. -define(TAB, emqx_shared_subscription).
  75. -define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter).
  76. -define(SHARED_SUBS, emqx_shared_subscriber).
  77. -define(ALIVE_SUBS, emqx_alive_shared_subscribers).
  78. -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
  79. -define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
  80. -define(ACK, shared_sub_ack).
  81. -define(NACK(Reason), {shared_sub_nack, Reason}).
  82. -define(NO_ACK, no_ack).
  83. -record(state, {pmon}).
  84. -record(emqx_shared_subscription, {group, topic, subpid}).
  85. %%--------------------------------------------------------------------
  86. %% Mnesia bootstrap
  87. %%--------------------------------------------------------------------
  88. mnesia(boot) ->
  89. ok = mria:create_table(?TAB, [
  90. {type, bag},
  91. {rlog_shard, ?SHARED_SUB_SHARD},
  92. {storage, ram_copies},
  93. {record_name, emqx_shared_subscription},
  94. {attributes, record_info(fields, emqx_shared_subscription)}
  95. ]).
  96. %%--------------------------------------------------------------------
  97. %% API
  98. %%--------------------------------------------------------------------
  99. -spec start_link() -> startlink_ret().
  100. start_link() ->
  101. gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
  102. -spec subscribe(emqx_types:group(), emqx_types:topic(), pid()) -> ok.
  103. subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
  104. gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}).
  105. -spec unsubscribe(emqx_types:group(), emqx_types:topic(), pid()) -> ok.
  106. unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
  107. gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
  108. record(Group, Topic, SubPid) ->
  109. #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
  110. -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
  111. emqx_types:deliver_result().
  112. dispatch(Group, Topic, Delivery) ->
  113. dispatch(Group, Topic, Delivery, _FailedSubs = []).
  114. dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
  115. #message{from = ClientId, topic = SourceTopic} = Msg,
  116. case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
  117. false ->
  118. {error, no_subscribers};
  119. {Type, SubPid} ->
  120. case do_dispatch(SubPid, Group, Topic, Msg, Type) of
  121. ok ->
  122. {ok, 1};
  123. {error, _Reason} ->
  124. %% Failed to dispatch to this sub, try next.
  125. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
  126. end
  127. end.
  128. -spec strategy(emqx_topic:group()) -> strategy().
  129. strategy(Group) ->
  130. case emqx:get_config([broker, shared_subscription_group, Group, strategy], undefined) of
  131. undefined -> emqx:get_config([broker, shared_subscription_strategy]);
  132. Strategy -> Strategy
  133. end.
  134. -spec ack_enabled() -> boolean().
  135. ack_enabled() ->
  136. emqx:get_config([broker, shared_dispatch_ack_enabled]).
  137. do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
  138. %% Deadlock otherwise
  139. SubPid ! {deliver, Topic, Msg},
  140. ok;
  141. %% return either 'ok' (when everything is fine) or 'error'
  142. do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
  143. %% For QoS 0 message, send it as regular dispatch
  144. SubPid ! {deliver, Topic, Msg},
  145. ok;
  146. do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
  147. %% Retry implies all subscribers nack:ed, send again without ack
  148. SubPid ! {deliver, Topic, Msg},
  149. ok;
  150. do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
  151. case ack_enabled() of
  152. true ->
  153. dispatch_with_ack(SubPid, Group, Topic, Msg);
  154. false ->
  155. SubPid ! {deliver, Topic, Msg},
  156. ok
  157. end.
  158. dispatch_with_ack(SubPid, Group, Topic, Msg) ->
  159. %% For QoS 1/2 message, expect an ack
  160. Ref = erlang:monitor(process, SubPid),
  161. Sender = self(),
  162. SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
  163. Timeout =
  164. case Msg#message.qos of
  165. ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
  166. ?QOS_2 -> infinity
  167. end,
  168. try
  169. receive
  170. {Ref, ?ACK} ->
  171. ok;
  172. {Ref, ?NACK(Reason)} ->
  173. %% the receive session may nack this message when its queue is full
  174. {error, Reason};
  175. {'DOWN', Ref, process, SubPid, Reason} ->
  176. {error, Reason}
  177. after Timeout ->
  178. {error, timeout}
  179. end
  180. after
  181. ok = emqx_pmon:demonitor(Ref)
  182. end.
  183. with_group_ack(Msg, Group, Sender, Ref) ->
  184. emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg).
  185. -spec without_group_ack(emqx_types:message()) -> emqx_types:message().
  186. without_group_ack(Msg) ->
  187. emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
  188. get_group_ack(Msg) ->
  189. emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
  190. -spec is_ack_required(emqx_types:message()) -> boolean().
  191. is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
  192. -spec get_group(emqx_types:message()) -> {ok, any()} | error.
  193. get_group(Msg) ->
  194. case get_group_ack(Msg) of
  195. ?NO_ACK -> error;
  196. {Group, _Sender, _Ref} -> {ok, Group}
  197. end.
  198. %% @doc Negative ack dropped message due to inflight window or message queue being full.
  199. -spec maybe_nack_dropped(emqx_types:message()) -> boolean().
  200. maybe_nack_dropped(Msg) ->
  201. case get_group_ack(Msg) of
  202. ?NO_ACK -> false;
  203. {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped)
  204. end.
  205. %% @doc Negative ack message due to connection down.
  206. %% Assuming this function is always called when ack is required
  207. %% i.e is_ack_required returned true.
  208. -spec nack_no_connection(emqx_types:message()) -> ok.
  209. nack_no_connection(Msg) ->
  210. {_Group, Sender, Ref} = get_group_ack(Msg),
  211. nack(Sender, Ref, no_connection).
  212. -spec nack(pid(), reference(), dropped | no_connection) -> ok.
  213. nack(Sender, Ref, Reason) ->
  214. Sender ! {Ref, ?NACK(Reason)},
  215. ok.
  216. -spec maybe_ack(emqx_types:message()) -> emqx_types:message().
  217. maybe_ack(Msg) ->
  218. case get_group_ack(Msg) of
  219. ?NO_ACK ->
  220. Msg;
  221. {_Group, Sender, Ref} ->
  222. Sender ! {Ref, ?ACK},
  223. without_group_ack(Msg)
  224. end.
  225. pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
  226. Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
  227. case is_active_sub(Sub0, FailedSubs) of
  228. true ->
  229. %% the old subscriber is still alive
  230. %% keep using it for sticky strategy
  231. {fresh, Sub0};
  232. false ->
  233. %% randomly pick one for the first message
  234. {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]),
  235. %% stick to whatever pick result
  236. erlang:put({shared_sub_sticky, Group, Topic}, Sub),
  237. {Type, Sub}
  238. end;
  239. pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
  240. do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
  241. do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
  242. All = subscribers(Group, Topic),
  243. case All -- FailedSubs of
  244. [] when All =:= [] ->
  245. %% Genuinely no subscriber
  246. false;
  247. [] ->
  248. %% All offline? pick one anyway
  249. {retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)};
  250. Subs ->
  251. %% More than one available
  252. {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs)}
  253. end.
  254. pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) ->
  255. Sub;
  256. pick_subscriber(Group, Topic, local, ClientId, SourceTopic, Subs) ->
  257. case lists:filter(fun(Pid) -> erlang:node(Pid) =:= node() end, Subs) of
  258. [_ | _] = LocalSubs ->
  259. pick_subscriber(Group, Topic, random, ClientId, SourceTopic, LocalSubs);
  260. [] ->
  261. pick_subscriber(Group, Topic, random, ClientId, SourceTopic, Subs)
  262. end;
  263. pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) ->
  264. Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)),
  265. lists:nth(Nth, Subs).
  266. do_pick_subscriber(_Group, _Topic, random, _ClientId, _SourceTopic, Count) ->
  267. rand:uniform(Count);
  268. do_pick_subscriber(Group, Topic, hash, ClientId, SourceTopic, Count) ->
  269. %% backward compatible
  270. do_pick_subscriber(Group, Topic, hash_clientid, ClientId, SourceTopic, Count);
  271. do_pick_subscriber(_Group, _Topic, hash_clientid, ClientId, _SourceTopic, Count) ->
  272. 1 + erlang:phash2(ClientId) rem Count;
  273. do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) ->
  274. 1 + erlang:phash2(SourceTopic) rem Count;
  275. do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) ->
  276. Rem =
  277. case erlang:get({shared_sub_round_robin, Group, Topic}) of
  278. undefined -> rand:uniform(Count) - 1;
  279. N -> (N + 1) rem Count
  280. end,
  281. _ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem),
  282. Rem + 1;
  283. do_pick_subscriber(Group, Topic, round_robin_per_group, _ClientId, _SourceTopic, Count) ->
  284. %% reset the counter to 1 if counter > subscriber count to avoid the counter to grow larger
  285. %% than the current subscriber count.
  286. %% if no counter for the given group topic exists - due to a configuration change - create a new one starting at 0
  287. ets:update_counter(?SHARED_SUBS_ROUND_ROBIN_COUNTER, {Group, Topic}, {2, 1, Count, 1}, {
  288. {Group, Topic}, 0
  289. }).
  290. subscribers(Group, Topic) ->
  291. ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
  292. %%--------------------------------------------------------------------
  293. %% gen_server callbacks
  294. %%--------------------------------------------------------------------
  295. init([]) ->
  296. ok = mria:wait_for_tables([?TAB]),
  297. {ok, _} = mnesia:subscribe({table, ?TAB, simple}),
  298. {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0),
  299. ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
  300. ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
  301. ok = emqx_tables:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [public, set, {write_concurrency, true}]),
  302. {ok, update_stats(#state{pmon = PMon})}.
  303. init_monitors() ->
  304. mnesia:foldl(
  305. fun(#emqx_shared_subscription{subpid = SubPid}, Mon) ->
  306. emqx_pmon:monitor(SubPid, Mon)
  307. end,
  308. emqx_pmon:new(),
  309. ?TAB
  310. ).
  311. handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
  312. mria:dirty_write(?TAB, record(Group, Topic, SubPid)),
  313. case ets:member(?SHARED_SUBS, {Group, Topic}) of
  314. true -> ok;
  315. false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
  316. end,
  317. ok = maybe_insert_alive_tab(SubPid),
  318. ok = maybe_insert_round_robin_count({Group, Topic}),
  319. true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
  320. {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
  321. handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
  322. mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
  323. true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
  324. delete_route_if_needed({Group, Topic}),
  325. maybe_delete_round_robin_count({Group, Topic}),
  326. {reply, ok, State};
  327. handle_call(Req, _From, State) ->
  328. ?SLOG(error, #{msg => "unexpected_call", req => Req}),
  329. {reply, ignored, State}.
  330. handle_cast(Msg, State) ->
  331. ?SLOG(error, #{msg => "unexpected_cast", req => Msg}),
  332. {noreply, State}.
  333. handle_info(
  334. {mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}},
  335. State = #state{pmon = PMon}
  336. ) ->
  337. {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
  338. %% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
  339. %% it `unsubscribed` the last topic.
  340. %% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
  341. %% be disconnected.
  342. % handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
  343. % #emqx_shared_subscription{subpid = SubPid} = OldRecord,
  344. % {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
  345. handle_info({mnesia_table_event, _Event}, State) ->
  346. {noreply, State};
  347. handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) ->
  348. ?SLOG(info, #{msg => "shared_subscriber_down", sub_pid => SubPid, reason => Reason}),
  349. cleanup_down(SubPid),
  350. {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
  351. handle_info(_Info, State) ->
  352. {noreply, State}.
  353. terminate(_Reason, _State) ->
  354. mnesia:unsubscribe({table, ?TAB, simple}).
  355. code_change(_OldVsn, State, _Extra) ->
  356. {ok, State}.
  357. %%--------------------------------------------------------------------
  358. %% Internal functions
  359. %%--------------------------------------------------------------------
  360. maybe_insert_round_robin_count({Group, _Topic} = GroupTopic) ->
  361. strategy(Group) =:= round_robin_per_group andalso
  362. ets:insert(?SHARED_SUBS_ROUND_ROBIN_COUNTER, {GroupTopic, 0}),
  363. ok.
  364. maybe_delete_round_robin_count({Group, _Topic} = GroupTopic) ->
  365. strategy(Group) =:= round_robin_per_group andalso
  366. if_no_more_subscribers(GroupTopic, fun() ->
  367. ets:delete(?SHARED_SUBS_ROUND_ROBIN_COUNTER, GroupTopic)
  368. end),
  369. ok.
  370. if_no_more_subscribers(GroupTopic, Fn) ->
  371. case ets:member(?SHARED_SUBS, GroupTopic) of
  372. true -> ok;
  373. false -> Fn()
  374. end,
  375. ok.
  376. %% keep track of alive remote pids
  377. maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
  378. maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
  379. ets:insert(?ALIVE_SUBS, {Pid}),
  380. ok.
  381. cleanup_down(SubPid) ->
  382. ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
  383. lists:foreach(
  384. fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
  385. ok = mria:dirty_delete_object(?TAB, Record),
  386. true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
  387. maybe_delete_round_robin_count({Group, Topic}),
  388. delete_route_if_needed({Group, Topic})
  389. end,
  390. mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})
  391. ).
  392. update_stats(State) ->
  393. emqx_stats:setstat(
  394. 'subscriptions.shared.count',
  395. 'subscriptions.shared.max',
  396. ets:info(?TAB, size)
  397. ),
  398. State.
  399. %% Return 'true' if the subscriber process is alive AND not in the failed list
  400. is_active_sub(Pid, FailedSubs) ->
  401. is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs).
  402. %% erlang:is_process_alive/1 does not work with remote pid.
  403. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
  404. erlang:is_process_alive(Pid);
  405. is_alive_sub(Pid) ->
  406. [] =/= ets:lookup(?ALIVE_SUBS, Pid).
  407. delete_route_if_needed({Group, Topic} = GroupTopic) ->
  408. if_no_more_subscribers(GroupTopic, fun() ->
  409. ok = emqx_router:do_delete_route(Topic, {Group, node()})
  410. end).