本次主要走一下客户端创建连接和接收数据的流程。
接上篇,创建Socket成功后,回调函数:emqx_connection, start_link, [Options -- SockOpts]。
emqx_connection.erl:
本模块为一个gen_server模块,所以它会给每一个客户端启动一个进程,并在初始化时,从acceptor接管Socket套接字。
init callback:
init(Parent, Transport, RawSocket, Options) ->
case Transport:wait(RawSocket) of
{ok, Socket} ->
run_loop(Parent, init_state(Transport, Socket, Options));
{error, Reason} ->
ok = Transport:fast_close(RawSocket),
exit_on_sock_error(Reason)
end.
这里重点函数有两个:init_state/3和run_loop/2
init_state,顾名思义,是将进程state中的数据或对象初始化,其中主要有套接字信息、Frame、Parse、Channel等相关帮助模块的初始化、GC初始化等等。
run_loop,处理Socket数据的轮询函数:
- 通过activate_socket,设置允许接收数据包的个数,有效的控制接收流量
- 调用hibernate/2
hibernate(Parent, State) ->
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
这里重点查了一下proc_lib:hibeernate的用法,类似erlang:hibernate,意思是,使调用进程处于等待状态,当有数据接收时,唤醒并调用MFA。
- wakeup_from_hib/2
其中是一个receive的流程,处理收到的数据:
receive
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
{'EXIT', Parent, Reason} ->
terminate(Reason, State);
Msg ->
process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
after
IdleTimeout ->
hibernate(Parent, cancel_stats_timer(State))
end.
- process_msg:
主要处理分包,解MQTT协议包。并将完整的解析后数据,交给channel处理。
%%--------------------------------------------------------------------
%% Handle incoming packet
handle_incoming(Packet, State) when is_record(Packet, mqtt_packet) ->
ok = inc_incoming_stats(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
with_channel(handle_in, [Packet], State);
handle_incoming(FrameError, State) ->
with_channel(handle_in, [FrameError], State).
%%--------------------------------------------------------------------
%% With Channel
with_channel(Fun, Args, State = #state{channel = Channel}) ->
case erlang:apply(emqx_channel, Fun, Args ++ [Channel]) of
ok -> {ok, State};
{ok, NChannel} ->
{ok, State#state{channel = NChannel}};
{ok, Replies, NChannel} ->
{ok, next_msgs(Replies), State#state{channel = NChannel}};
{shutdown, Reason, NChannel} ->
shutdown(Reason, State#state{channel = NChannel});
{shutdown, Reason, Packet, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(Packet, NState),
shutdown(Reason, NState)
end.
emqx_channel.erl & emqx_session.erl:
为什么将这两个放一起说呢,是因为他们俩是配合做事的。
主要是处理MQTT的各种协议包了:CONNECT,SUBSCRIBE,PUBLISH,UNSUB,DISCONN等等。有兴趣的同学可以深入进去,看看每一个协议包的处理流程,本次就不再赘述了。
截取connect的流程性代码片段:
handle_in:
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
case pipeline([fun enrich_conninfo/2,
fun check_connect/2,
fun enrich_client/2,
fun set_logger_meta/2,
fun check_banned/2,
fun auth_connect/2
], ConnPkt, Channel#channel{conn_state = connecting}) of
{ok, NConnPkt, NChannel} ->
process_connect(NConnPkt, NChannel);
{error, ReasonCode, NChannel} ->
handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
end;
process_connect:
%%--------------------------------------------------------------------
%% Process Connect
%%--------------------------------------------------------------------
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
{ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
{ok, #{session := Session, present := true, pendings := Pendings}} ->
%%TODO: improve later.
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
NChannel = Channel#channel{session = Session,
resuming = true,
pendings = Pendings1
},
handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
{error, client_id_unavailable} ->
handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel);
{error, Reason} ->
?LOG(error, "Failed to open session due to ~p", [Reason]),
handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
end.
可以简单看出来:
- 收到connect请求后,会尝试建立session数据
- connect的返回结果,是调用handle_out
-spec(handle_out(atom(), term(), channel())
-> {ok, channel()}
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}
| {shutdown, Reason :: term(), replies(), channel()}).
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel) ->
AckProps = run_fold([fun enrich_connack_caps/2,
fun enrich_server_keepalive/2,
fun enrich_assigned_clientid/2
], #{}, Channel),
AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps),
return_connack(AckPacket,
ensure_keepalive(AckProps,
ensure_connected(ConnPkt, Channel)));
handle_out(connack, {ReasonCode, _ConnPkt},
Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ok = emqx_hooks:run('client.connected', [ClientInfo, ReasonCode, ConnInfo]),
AckPacket = ?CONNACK_PACKET(
case maps:get(proto_ver, ConnInfo) of
?MQTT_PROTO_V5 -> ReasonCode;
_Other -> emqx_reason_codes:compat(connack, ReasonCode)
end),
shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel);