容器日志处理及实现
容器日志
输出形式:
目前容器日志有两种输出形式:
stdout,stderr 标准输出
这种形式的日志输出我们可以直接使用docker logs查看日志, k8s 集群中同样集群可以使用kubectl logs类似的形式查看日志。
日志文件记录
这种日志输出我们无法从以上方法查看日志内容,只能tail日志文件查看。
收集方式:
不论你的业务容器日志如何输出,都是可以使用统一的日志收集器收集。常见的日志收集方式:
k8s 集群
集群启动时会在每个机器启动一个Fluentd agent收集日志然后发送给 Elasticsearch。实现方式是每个agent挂载目录/var/lib/docker/containers使用fluentd的tail插件扫描每个容器日志文件,直接发送给Elasticsearch。
Fluentd agent起在业务同一个 pod 中共享 volume 然后实现对日志文件的收集发送给Elasticsearch。
docker swarm 集群
swarm 目前暂时没有提供日志查看机制。但是docker cloud提供了与kubectrl logs类似的机制查看 stdout 的日志。目前还没有 fluentd 插件直接对服务进行日志收集,暂时考虑直接使用使用跟容器一样的机制收集。docker service create 支持--log-driver
docker 容器
从 docker1.8 内置了fluentd log driver 。以如下的形式启动容器,容器 stdout/stderr 日志将发往配置的 fluentd 。如果配置后,docker logs将无法使用。另外默认模式下如果你配置得地址没有正常服务,容器无法启动。你也可以使用fluentd-async-connect形式启动, docker daemon 则能在后台尝试连接并缓存日志。
`docker run --log-driver=fluentd --log-opt fluentd-address=myhost.local:24224
`
同样如果是日志文件,将文件暴露出来直接使用 fluentd 收集。
容器日志源码简单分析
# /container/container.go:63 type CommonContainer struct { StreamConfig *stream.Config ... } # /container/stream/streams.go:26 type Config struct { sync.WaitGroup stdout *broadcaster.Unbuffered stderr *broadcaster.Unbuffered stdin io.ReadCloser stdinPipe io.WriteCloser }
moby源码来看,每一个container实例都有几个属性stdout,stderr,stdin,以及管道stdinPipe(当容器使用-i参数启动时标准输入将被运行,daemon将能够使用此管道向容器内写入标准输入).
那么针对如上的实例该如何实现日志收集转发?
# /container/container.go:312 func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) { c, err := logger.GetLogDriver(cfg.Type) if err != nil { return nil, fmt.Errorf("Failed to get logging factory: %v", err) } ctx := logger.Context{ Config: cfg.Config, ContainerID: container.ID, ContainerName: container.Name, ContainerEntrypoint: container.Path, ContainerArgs: container.Args, ContainerImageID: container.ImageID.String(), ContainerImageName: container.Config.Image, ContainerCreated: container.Created, ContainerEnv: container.Config.Env, ContainerLabels: container.Config.Labels, DaemonName: "docker", } // Set logging file for "json-logger" if cfg.Type == jsonfilelog.Name { ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID)) if err != nil { return nil, err } } return c(ctx) } #/container/container.go:978 func (container *Container) startLogging() error { if container.HostConfig.LogConfig.Type == "none" { return nil // do not start logging routines } l, err := container.StartLogger(container.HostConfig.LogConfig) if err != nil { return fmt.Errorf("Failed to initialize logging driver: %v", err) } copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) container.LogCopier = copier copier.Run() container.LogDriver = l // set LogPath field only for json-file logdriver if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { container.LogPath = jl.LogPath() } return nil }
第一个方法是为container查找log-driver。首先根据容器配置的log-driver类别调用:logger.GetLogDriver(cfg.Type)
返回一个方法类型:
/daemon/logger/factory.go:9 type Creator func(Context) (Logger, error)
实质就是从工厂类注册的logdriver插件去查找,具体源码下文分析。获取到c方法后构建调用参数具体就是容器的一些信息。然后使用调用c方法返回driver。driver是个接口类型,我们看看有哪些方法:
# /daemon/logger/logger.go:61 type Logger interface { Log(*Message) error Name() string Close() error }
很简单的三个方法,也很容易理解,Log()发送日志消息到driver,Close()进行关闭操作(根据不同实现)。
也就是说我们自己实现一个logdriver,只需要实现如上三个方法,然后注册到logger工厂类中即可。下面我们来看/daemon/logger/factory.go
第二个方法就是处理日志了,获取到日志driver,在创建一个Copier,顾名思义就是复制日志,分别从stdout 和stderr复制到logger driver。下面看看具体关键实现:
#/daemon/logger/copir.go:41 func (c *Copier) copySrc(name string, src io.Reader) { defer c.copyJobs.Done() reader := bufio.NewReader(src) for { select { case <-c.closed: return default: line, err := reader.ReadBytes('\n') line = bytes.TrimSuffix(line, []byte{'\n'}) // ReadBytes can return full or partial output even when it failed. // e.g. it can return a full entry and EOF. if err == nil || len(line) > 0 { if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) } } if err != nil { if err != io.EOF { logrus.Errorf("Error scanning log stream: %s", err) } return } } } }
每读取一行数据,构建一个消息,调用logdriver的log方法发送到driver处理。
日志driver注册器
位于/daemon/logger/factory.go的源码实现即时日志driver的注册器,其中几个重要的方法(上文已经提到一个):
# /daemon/logger/factory.go:21 func (lf *logdriverFactory) register(name string, c Creator) error { if lf.driverRegistered(name) { return fmt.Errorf("logger: log driver named '%s' is already registered", name) } lf.m.Lock() lf.registry[name] = c lf.m.Unlock() return nil } # /daemon/logger/factory.go:39 func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error { lf.m.Lock() defer lf.m.Unlock() if _, ok := lf.optValidator[name]; ok { return fmt.Errorf("logger: log validator named '%s' is already registered", name) } lf.optValidator[name] = l return nil }
看起来很简单,就是将一个Creator方法类型添加到一个map结构中,将LogOptValidator添加到另一个map这里注意加锁的操作。
#/daemon/logger/factory.go:13 type LogOptValidator func(cfg map[string]string) error
这个主要是验证driver的参数 ,dockerd和docker启动参数中有:--log-opt
实例
云帮怎么实现的
使用自己实现的 zeroMQ-driver 直接将容器日志通过 0MQ 发到日志统一处理中心。在处理中心统一完成下一步处理。如果平台用户需要将日志向外输出或者直接对接平台内日志分析应用,我们的处理是在应用 pod 中启动日志收集插件容器(封装扩展的 fluentd ),根据用户的需要配置日志出口,实现应用级日志收集。容器日志首先是由 docker-daemon 收集到,再根据容器 log-driver 配置进行相应操作,也就是说如果你的宿主机网络与容器网络不通(k8s 集群),日志从宿主机到 pod 中的收集容器只有两种方式:走外层网络,文件挂载。 我们采用文件挂载方式。
以zmq-driver为例讲讲我们怎么实现自己的driver。直接接收容器的日志。
//定义一个struct,这里包含一个zmq套接字 type ZmqLogger struct { writer *zmq.Socket containerId string tenantId string serviceId string felock sync.Mutex } //定义init方法调用logger注册器的方法注册当前driver //和参数验证方法。 func init() { if err := logger.RegisterLogDriver(name, New); err != nil { logrus.Fatal(err) } if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil { logrus.Fatal(err) } } //实现一个上文提到的Creator方法注册logdriver. //这里新建一个zmq套接字构建一个实例 func New(ctx logger.Context) (logger.Logger, error) { zmqaddress := ctx.Config[zmqAddress] puber, err := zmq.NewSocket(zmq.PUB) if err != nil { return nil, err } var ( env = make(map[string]string) tenantId string serviceId string ) for _, pair := range ctx.ContainerEnv { p := strings.SplitN(pair, "=", 2) //logrus.Errorf("ContainerEnv pair: %s", pair) if len(p) == 2 { key := p[0] value := p[1] env[key] = value } } tenantId = env["TENANT_ID"] serviceId = env["SERVICE_ID"] if tenantId == "" { tenantId = "default" } if serviceId == "" { serviceId = "default" } puber.Connect(zmqaddress) return &ZmqLogger{ writer: puber, containerId: ctx.ID(), tenantId: tenantId, serviceId: serviceId, felock: sync.Mutex{}, }, nil } //实现Log方法,这里使用zmq socket发送日志消息 //这里必须注意,zmq socket是线程不安全的,我们知道 //本方法可能被两个线程(复制stdout和肤质stderr)调用//必须使用锁保证线程安全。否则会发生错误。 func (s *ZmqLogger) Log(msg *logger.Message) error { s.felock.Lock() defer s.felock.Unlock() s.writer.Send(s.tenantId, zmq.SNDMORE) s.writer.Send(s.serviceId, zmq.SNDMORE) if msg.Source == "stderr" { s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT) } else { s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT) } return nil } //实现Close方法,这里用来关闭zmq socket。 //同样注意线程安全,调用此方法的是容器关闭协程。 func (s *ZmqLogger) Close() error { s.felock.Lock() defer s.felock.Unlock() if s.writer != nil { return s.writer.Close() } return nil } func (s *ZmqLogger) Name() string { return name } //验证参数的方法,我们使用参数传入zmq pub的地址。 func ValidateLogOpt(cfg map[string]string) error { for key := range cfg { switch key { case zmqAddress: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } } if cfg[zmqAddress] == "" { return fmt.Errorf("must specify a value for log opt '%s'", zmqAddress) } return nil }