skynet call的实现--服务与服务的交互

在实现业务逻辑的过程中,各个逻辑一般会抽象成一个服务,例如游戏中的登录服务,访问数据库服务,创建房间服务等等.服务与服务之间肯定是要通信和交互的,例如登录的时候要请求数据库验证.skynet是怎么实现两个服务的通信和交互的呢?为了突出主干,我简化了lua服务代码,A服务调用B服务:

A服务代码(bootstrap.lua):

skynet.start(function()
    local addr = c.command("LAUNCH", "snlua main")
    skynet.name(".main", addr)
    for i = 1, 100000 do              --为了保证调用B服务时,该服务已启动
        table.insert({}, 123)
    end
    local kv = skynet.call('.main',"lua","get", 'name')
    print('kv is ', kv)
end)

B服务代码(main.lua)

local skynet = require "skynet"

local command = {name = 'shonm'}

function command.get(key)
    return command[key]
end

skynet.start(function()
    skynet.dispatch("lua", function(session, address, cmd, ...)
        local f = command[cmd]      
        if f then
            skynet.ret(skynet.pack(f(...))) --回应消息
        else
            error(string.format("Unknown command %s", tostring(cmd)))
        end
    end)
end)

入口函数如何执行,上篇skynet lua业务逻辑的执行我们已经讲过了.这篇主要讲解A服务调用call(B)时,代码的执行过程.

skynet.call的实现为:

function skynet.call(addr, typename, ...)
    local p = proto[typename]
    local session = c.send(addr, p.id , nil , p.pack(...))       --①
    if session == nil then
        error("call to invalid address " .. skynet.address(addr))
    end
    return p.unpack(yield_call(addr, session))                  --②
end

① 调用c接口给B服务的地址发送消息,即向对方消息队列push消息,并且返回一个session.注意该消息结构里也是这个session字段的,他的作用下面将会讲到.有消息是经过pack打包的,关于消息怎么打包再另外讲解.

② 是消息的发送者(服务A)接下来执行的代码,yield_cal的实现为:

local function yield_call(service, session)
    watching_session[session] = service
    local succ, msg, sz = coroutine_yield("CALL", session)    --①
    watching_session[session] = nil
    if not succ then
        error "call failed"
    end
    return msg,sz
end

①的意图很明显,暂停协程,传入的参数是'CALL',和消息的session.

上篇中提到raw_dispatch_message中suspend(co, coroutine.resume(co, true, msg, sz)),这里resume会返回'CALL',session,并传给suspend函数,即suspend(co, result, 'CALL', session)(还没有搞懂协程之间是怎么传参数的童鞋请参考lua协程相关参数与返回值),result为resume返回的协程状态,正常为true. suspend的简化实现为:

function suspend(co, result, command, param, size)
    --print(' command is ', command, ' ', string.format('%0x', skynet.self()))
    if not result then
        ...
        error(debug.traceback(co,tostring(command)))
    end
    if command == "CALL" then
        session_id_coroutine[param] = co
    elseif command == "SLEEP" then
        session_id_coroutine[param] = co
        sleep_session[co] = param
    elseif command == "RETURN" then
        local co_session = session_coroutine_id[co]
        local co_address = session_coroutine_address[co]

        session_response[co] = true
        ...
                ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
                    return suspend(co, coroutine.resume(co, ret))
    elseif command == "RESPONSE" then
        ...
        return suspend(co, coroutine.resume(co, response))
    elseif command == "EXIT" then
        ...
    elseif command == "QUIT" then
        -- service exit
        return
    elseif command == nil then
        -- debug trace
        return
    else
        error("Unknown command : " .. command .. "\n" .. debug.traceback(co))
    end
    dispatch_wakeup()
    dispatch_error_queue()
end

我们可以看到如果command是'CALL'只是将session与co关联,然后什么也没有做,他仍然是被挂起的(coroutine.yield()会一直挂起协程,直到coroutine.resume恢复).

可见call另外一个服务会挂起当前服务.那么call何时返回呢?即当前服务(即发送消息的服务)何时恢复呢?

话分两头,我们再说说B服务.

B服务的skynet.start函数中只是调用skynet.dispatch('lua', func),他只是在skynet中注册了'lua'协议的回调函数,然后就返回了.

服务捕获消息并执行后也会调用raw_dispatch_message.通过前几篇的介绍,我们知道lua层执行的第一个消息是skynet.start()调用timeout来发送的,当执行完这个消息之后,代码继续怎么走呢?还是回到那个协程池函数:

local function co_create(f)
    local co = table.remove(coroutine_pool)
    if co == nil then
        co = coroutine.create(function(...)
            f(...)
            while true do
                f = nil
                coroutine_pool[#coroutine_pool+1] = co
                f = coroutine_yield "EXIT"      --①
                f(coroutine_yield())
            end
        end)
    else
        coroutine.resume(co, f)
    end
    return co
end

①的代码就是这个消息函数最后的结果,协程被挂起.至此,skynet.start()要执行的函数就执行完毕了.我们再看后续的流程.coroutine.yield传入的参数是'EXIT'(这个代表了一条消息处理完毕,而不是整个服务退出),在suspend函数中有相应的处理,但是它并不重新恢复协程,什么时候恢复下面会讲到.

为了叙述方便,我们再贴一遍raw_dispatch_message代码:

local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
    -- skynet.PTYPE_RESPONSE = 1, read skynet.h
    if prototype == 1 then
        local co = session_id_coroutine[session]
        if co == "BREAK" then
            session_id_coroutine[session] = nil
        elseif co == nil then
            unknown_response(session, source, msg, sz)
        else
            session_id_coroutine[session] = nil
            suspend(co, coroutine.resume(co, true, msg, sz))
        end
    else    --①
        local p = proto[prototype]
        if p == nil then
            if session ~= 0 then
                c.send(source, skynet.PTYPE_ERROR, session, "")
            else
                unknown_request(session, source, msg, sz, prototype)
            end
            return
        end
        local f = p.dispatch
        if f then
            local ref = watching_service[source]
            if ref then
                watching_service[source] = ref + 1
            else
                watching_service[source] = 1
            end
            local co = co_create(f)    --②
            session_coroutine_id[co] = session
            session_coroutine_address[co] = source
            suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))  --③
        else
            unknown_request(session, source, msg, sz, proto[prototype].name)
        end
    end
end

我们注意到收发一般的消息用到的协议是"lua",这个在skynet.lua中有注册过.当B服务收到A服务发来的消息之后,走分支①.刚才我们说到上一消息结束后导致协程被挂起了.

再看②,又调用了co_create(),回去看看他的实现,else分支会重新恢复协程,传递我们自己定义的函数,即B服务代码(main.lua)中的skynet.dispatch函数注册了一个新的回调函数f,然后又挂起了.在③中才恢复了协程,传入resume的参数,执行f.即他将执行f(session,source, p.unpack(msg,sz, ...)).

B的服务解包这个消息的内容为get,name.对应的有command[get]函数,所以他会执行skynet.ret(skynet.pack(f(...))).关键在于skynet.ret(),它用来回复A服务消息,他的实现为:

function skynet.ret(msg, sz)
    msg = msg or ""
    return coroutine_yield("RETURN", msg, sz)
end

于是他又将协程挂起,传入消息相关参数. corutine.resume函数返回,来到suspend()函数.在command == "RETURN"中,他给A服务发送了一条回应消息,那么A服务的地址又是怎么获取的呢?

原来在B收到A的消息时,执行raw_dispatch_message函数中通过表session_coroutine_id和session_coroutine_address保存有A服务的地址,以及该消息的session.

回复的这条消息的类型为skynet.PTYPE_RESPONSE,参数是B处理A的请求并打包的结果.

前面讲到A调用call之后会被挂起,当收到B回复的消息时,再次调用raw_dispatch_message.上面提到消息的类型为skynet.PTYPE_RESPONSE.由于在Call的时候保存了session对应的协程,这里再次通过session就很容易找到协程了.所以一个消息的session至关重要.总结一下,就是A服务给B服务发送消息时会产生一个session,同时消息中也会包含session字段.传递给B时又会回应给A,这样A服务就找到了对应的那个session相关的信息.再次调用coroutine.resume,于是之前挂起的协程恢复运行,也就是yield_call函数(就是在那里coroutine_yield("CALL", session)挂起的)成功返回了B回应的消息,然后解包就可以得到正确的结果.

另外,skynet还有个send函数,他的实现为:

function skynet.send(addr, typename, ...)
    local p = proto[typename]
    return c.send(addr, p.id, 0 , p.pack(...))
end

可以看出,send只是发送一条消息到给对方就直接返回,不会被挂起.还有就是他提供的session ID为0,而call的session ID为nil.他们的区别是,在c接口层,如果sessionID为nil,就会重新生成一个新的session ID.这个sessionID是用来关联当前协程的,上面有讲到.

另外比较有意思的是,服务也可以给自己发消息.对于call,他保存新的sessionID后会挂起,再次收到response类型的消息,根据sessionID找到协程,然后恢复.对于send,他的sessionID为0,没有关联的协程,所以不会响应.

好了,这篇比较长,也比较绕,结合之前写的,多看几遍应该能够理解.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容