APIServer
Server接收用户通过client发来的请求,按照路由规则分发,交给后端处理完毕后将结果返回至client
APIServer
之前说到start()
中的NewDaemon()
是初始化Daemon的核心逻辑,APIServer的创建在start()
中。
- 首先调用
NewAPIServerConfig(cli)
,根据DaemonCli的配置生成APIServer的配置文件,比如log、Version、TLS,并将Cli.Config.Hosts的值置为1,然后将serverConfig
返回。之后如果发现建立config失败,则输出错误信息,反之进入下个环节。 - 第二步调用
apiserver.New(serverConfig)
新建server实例,server中包含了config、router、HTTPServer等,新建之后填充了config的部分,其他仍未初始化。 - 第三步调用
loadListeners()
,在这个函数里根据DaemonCli里的HOSTS,对每一个可用的host,解析出protocol和address,调用listeners.Init()
与Daemon建立socket连接,分配一个Daemon的端口监听socket。即与每个可用的Daemon建立连接(如果你尝试一个APIServer连接到多个daemon的话,是可行的)。 - 到此为止,已经初始化了apiserver,之后会为APIServer初始化router。
func newAPIServerConfig(cli *DaemonCli) (*apiserver.Config, error) {
serverConfig := &apiserver.Config{
Logging: true,
SocketGroup: cli.Config.SocketGroup,
Version: dockerversion.Version,
CorsHeaders: cli.Config.CorsHeaders,
}
if cli.Config.TLS {
tlsOptions := tlsconfig.Options{
CAFile: cli.Config.CommonTLSOptions.CAFile,
CertFile: cli.Config.CommonTLSOptions.CertFile,
KeyFile: cli.Config.CommonTLSOptions.KeyFile,
ExclusiveRootPools: true,
}
if cli.Config.TLSVerify {
// server requires and verifies client's certificate
tlsOptions.ClientAuth = tls.RequireAndVerifyClientCert
}
tlsConfig, err := tlsconfig.Server(tlsOptions)
if err != nil {
return nil, err
}
serverConfig.TLSConfig = tlsConfig
}
if len(cli.Config.Hosts) == 0 {
cli.Config.Hosts = make([]string, 1)
}
return serverConfig, nil
}
func New(cfg *Config) *Server {
return &Server{
cfg: cfg,
}
}
// Server contains instance details for the server
type Server struct {
cfg *Config
servers []*HTTPServer
routers []router.Router
routerSwapper *routerSwapper
middlewares []middleware.Middleware
}
func loadListeners(cli *DaemonCli, serverConfig *apiserver.Config) ([]string, error) {
var hosts []string
for i := 0; i < len(cli.Config.Hosts); i++ {
var err error
if cli.Config.Hosts[i], err = dopts.ParseHost(cli.Config.TLS, cli.Config.Hosts[i]); err != nil {
return nil, fmt.Errorf("error parsing -H %s : %v", cli.Config.Hosts[i], err)
}
protoAddr := cli.Config.Hosts[i]
protoAddrParts := strings.SplitN(protoAddr, "://", 2)
if len(protoAddrParts) != 2 {
return nil, fmt.Errorf("bad format %s, expected PROTO://ADDR", protoAddr)
}
proto := protoAddrParts[0]
addr := protoAddrParts[1]
// It's a bad idea to bind to TCP without tlsverify.
if proto == "tcp" && (serverConfig.TLSConfig == nil || serverConfig.TLSConfig.ClientAuth != tls.RequireAndVerifyClientCert) {
logrus.Warn("[!] DON'T BIND ON ANY IP ADDRESS WITHOUT setting --tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING [!]")
}
ls, err := listeners.Init(proto, addr, serverConfig.SocketGroup, serverConfig.TLSConfig)
if err != nil {
return nil, err
}
ls = wrapListeners(proto, ls)
// If we're binding to a TCP port, make sure that a container doesn't try to use it.
if proto == "tcp" {
if err := allocateDaemonPort(addr); err != nil {
return nil, err
}
}
logrus.Debugf("Listener created for HTTP on %s (%s)", proto, addr)
hosts = append(hosts, protoAddrParts[1])
cli.api.Accept(addr, ls...)
}
return hosts, nil
}
router
初始化router的过程同样在start()
里。
- 调用
newRouterOptions()
进行参数的设置 - 调用
initRouter
。建立了decoder进行命令的解析,之后建立了多个router分别为不同的模块服务,比如checkpoint router, container router, image router, volume router, swarm router等等。接收消息后会根据消息的信令分配不同的router进行处理。对于router的初始化都调用了NewRouter
函数。比如container.NewRouter()
里调用了r.initRoutes()
,其中定义了router.NewGetRoute("/containers/json", r.getContainersJSON)
这样的配置,就为处理命令寻找到了相应的文件。 - 把所有的router初始化好,调用
InitRouter
。吐槽一下命名,只是i大小写的区别都可以吗!很容易看不懂的啊!大写的比较牛逼,定义了API Server使用的main router,它通过调用CreateMux()
中的makeHTTPHandler()
来建立main router。这个函数的形参是responseWriter和Request,就非常好理解是干啥的了,它定义了context这个概念,这是一个全局的变量,然后通过handlerWithGlobalMiddlewares()
定义了handlerFunc来处理request,通过WrapHandler()
把回调函数注册进去了,之后通过mux.Vars()
获取了request相关的router的信息,于是请求的处理过程真正是通过handlerFunc(ctx, w, r, vars)
实现的。
func (s *Server) makeHTTPHandler(handler httputils.APIFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Define the context that we'll pass around to share info
// like the docker-request-id.
//
// The 'context' will be used for global data that should
// apply to all requests. Data that is specific to the
// immediate function being called should still be passed
// as 'args' on the function call.
// use intermediate variable to prevent "should not use basic type
// string as key in context.WithValue" golint errors
var ki interface{} = dockerversion.UAStringKey
ctx := context.WithValue(context.Background(), ki, r.Header.Get("User-Agent"))
handlerFunc := s.handlerWithGlobalMiddlewares(handler)
vars := mux.Vars(r)
if vars == nil {
vars = make(map[string]string)
}
if err := handlerFunc(ctx, w, r, vars); err != nil {
statusCode := httputils.GetHTTPErrorStatusCode(err)
if statusCode >= 500 {
logrus.Errorf("Handler for %s %s returned error: %v", r.Method, r.URL.Path, err)
}
httputils.MakeErrorHandler(err)(w, r)
}
}
}
func initRouter(opts routerOptions) {
decoder := runconfig.ContainerDecoder{}
routers := []router.Router{
// we need to add the checkpoint router before the container router or the DELETE gets masked
checkpointrouter.NewRouter(opts.daemon, decoder),
container.NewRouter(opts.daemon, decoder),
image.NewRouter(opts.daemon.ImageService()),
systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildCache, opts.buildkit),
volume.NewRouter(opts.daemon.VolumesService()),
build.NewRouter(opts.buildBackend, opts.daemon),
sessionrouter.NewRouter(opts.sessionManager),
swarmrouter.NewRouter(opts.cluster),
pluginrouter.NewRouter(opts.daemon.PluginManager()),
distributionrouter.NewRouter(opts.daemon.ImageService()),
}
if opts.daemon.NetworkControllerEnabled() {
routers = append(routers, network.NewRouter(opts.daemon, opts.cluster))
}
if opts.daemon.HasExperimental() {
for _, r := range routers {
for _, route := range r.Routes() {
if experimental, ok := route.(router.ExperimentalRoute); ok {
experimental.Enable()
}
}
}
}
opts.api.InitRouter(routers...)
}
middleware
那么,还需要关注一下middleware。它的初始化在start中的initMiddlewares()
,形参用到了cli.api、serverConfig、pluginStore。
pluginStore在上一行的plugin.NewStore()
完成初始化,它返回一个Store对象,其中最重要的是定义了注册回调函数的接口
/* handlers are necessary for transition path of legacy plugins
* to the new model. Legacy plugins use Handle() for registering an
* activation callback.*/
handlers map[string][]func(string, *plugins.Client)
initMiddlewares
定义了不同的中间件,包括:
- NewExperimentalMiddleware: 返回一个
experimental string
(true或者false)标志实验性功能是否可用 - NewVersionMiddleware: 返回version信息,包括server、default、minVersion
- NewCORSMiddleware: 不清楚是什么
- authzMiddleware: authorizationPlugin is an internal adapter to docker plugin system,有更具体的解释吗?
// TODO: remove this from cli and return the authzMiddleware
func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config, pluginStore plugingetter.PluginGetter) error {
v := cfg.Version
exp := middleware.NewExperimentalMiddleware(cli.Config.Experimental)
s.UseMiddleware(exp)
vm := middleware.NewVersionMiddleware(v, api.DefaultVersion, api.MinVersion)
s.UseMiddleware(vm)
if cfg.CorsHeaders != "" {
c := middleware.NewCORSMiddleware(cfg.CorsHeaders)
s.UseMiddleware(c)
}
cli.authzMiddleware = authorization.NewMiddleware(cli.Config.AuthorizationPlugins, pluginStore)
cli.Config.AuthzMiddleware = cli.authzMiddleware
s.UseMiddleware(cli.authzMiddleware)
return nil
}
重新回到上面的handlerWithGlobalMiddlewares()
函数,它内部有一个循环,遍历了使用的middlewares,对每个middleware调用WrapHandler()
Middleware的作用docker的解释是:
// Middleware is an interface to allow the use of ordinary functions as Docker API filters.
// Any struct that has the appropriate signature can be registered as a middleware.
WrapHandler对于不同的middleware有不同的实现版本,它的原型是
type Middleware interface {
WrapHandler(func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error
}
开始提供服务
在创建好APIServer,连接上Daemon,创建、设置好Router之后,APIServer就可以向外提供服务了,它提供的是一个HTTP Server之上标准的RESTful-API,对用户非常友好。
APIServer的真正运行采用go routine的方式,go cli.api.wait()
真正启动了APIServer。采用go routine的原因是:如果APIServer出错,daemon会随之退出,采用go routine可以在出错的时候尝试重启,保证docker的正常运行。在wait函数中调用了serveAPI(),由于之前可能和多个Daemon建立了连接,所以对每个HTTP server,调用Serve()
函数。
Serve函数为每个client的连接建立goroutine的服务,服务会解析request,调用handler,返回结果。
该函数并不是docker开发者自己写的,而是调用了GO语言的net/http库。大概里面的函数有:readRequest
,handler := sh.srv.Handler
,ServeHTTP(ResponseWriter, *Request)
等。由于是系统库,所以先不看了。
// serveAPI loops through all initialized servers and spawns goroutine
// with Serve method for each. It sets createMux() as Handler also.
func (s *Server) serveAPI() error {
var chErrors = make(chan error, len(s.servers))
for _, srv := range s.servers {
srv.srv.Handler = s.routerSwapper
go func(srv *HTTPServer) {
var err error
logrus.Infof("API listen on %s", srv.l.Addr())
if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
err = nil
}
chErrors <- err
}(srv)
}
for range s.servers {
err := <-chErrors
if err != nil {
return err
}
}
return nil
}
总结
附上start中的高层流程:
func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
serverConfig, err := newAPIServerConfig(cli)
if err != nil {
return fmt.Errorf("Failed to create API server: %v", err)
}
cli.api = apiserver.New(serverConfig)
hosts, err := loadListeners(cli, serverConfig)
if err != nil {
return fmt.Errorf("Failed to load listeners: %v", err)
}
...
pluginStore := plugin.NewStore()
if err := cli.initMiddlewares(cli.api, serverConfig, pluginStore); err != nil {
logrus.Fatalf("Error creating middlewares: %v", err)
}
...
routerOptions, err := newRouterOptions(cli.Config, d)
if err != nil {
return err
}
routerOptions.api = cli.api
routerOptions.cluster = c
initRouter(routerOptions)
...
serveAPIWait := make(chan error)
go cli.api.Wait(serveAPIWait)
...
errAPI := <-serveAPIWait
APIServer是daemon和docker client通信的接口,在daemon的初始化流程中优先级非常高。通过初始化apiserver实例、router实例、middleware实例,系统已经为APIServer提供了完整的运行环境。Middleware为在docker和用户之间做了一层隔离,为用户提供标准的接口,通过middleware,回调函数被注册进router中。
真正运行时,采用goroutine的方式保证了保证了daemon的稳定运行,当request到达APIServer中后,APIServer通过main router进行查找,调用相应的router,也就执行了相应的回调函数的调用,从而实现了对request的处理。