我们要想进行二次开发首先需要把他相关的函数和运行的逻辑摸清楚才能进行。
beats/winlogbeat/beater/winlogbeat.go
func New(b *beat.Beat, _ *common.Config) (beat.Beater, error) { // Read configuration. config := config.DefaultSettings err := b.BeatConfig.Unpack(&config) if err != nil { return nil, fmt.Errorf("Error reading configuration file. %v", err) } // resolve registry file path config.RegistryFile = paths.Resolve(paths.Data, config.RegistryFile) logp.Info("State will be read from and persisted to %s", config.RegistryFile) eb := &Winlogbeat{ beat: b, config: config, done: make(chan struct{}), } if err := eb.init(b); err != nil { //初始化配置文件 查看取的日志名称是否存在 return nil, err } return eb, nil }beats/libbeat/cmd/run.go
func genRunCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Command { name := settings.Name runCmd := cobra.Command{ Use: "run", Short: "Run " + name, Run: func(cmd *cobra.Command, args []string) { err := instance.Run(settings, beatCreator) // 下一个运行点调用run if err != nil { os.Exit(1) } }, } // Run subcommand flags, only available to *beat run runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("N")) runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("httpprof")) runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("cpuprofile")) runCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("memprofile")) if settings.RunFlags != nil { runCmd.Flags().AddFlagSet(settings.RunFlags) } return &runCmd }beats/libbeat/cmd/instance/beat.go
func Run(settings Settings, bt beat.Creator) error { err := setUmaskWithSettings(settings) if err != nil && err != errNotImplemented { return errw.Wrap(err, "could not set umask") } ...... return b.launch(settings, bt) // 下一个点 }()) }beats/libbeat/cmd/instance/beat.go
func (b *Beat) launch(settings Settings, bt beat.Creator) error { defer logp.Sync() ...... return beater.Run(&b.Beat) // 运行入口点 }beats/winlogbeat/beater/winlogbeat.go 根据采集的日志类型会创建相应的协程。
// Run is used within the beats interface to execute the Winlogbeat workers. func (eb *Winlogbeat) Run(b *beat.Beat) error { if err := eb.setup(b); err != nil { return err } acker := newEventACKer(eb.checkpoint) persistedState := eb.checkpoint.States() // Initialize metrics. initMetrics("total") // setup global event ACK handler err := eb.pipeline.SetACKHandler(beat.PipelineACKHandler{ ACKEvents: acker.ACKEvents, }) if err != nil { return err } var wg sync.WaitGroup for _, log := range eb.eventLogs { state, _ := persistedState[log.source.Name()] // Start a goroutine for each event log. wg.Add(1) go eb.processEventLog(&wg, log, state, acker) // 主要采集函数 } wg.Wait() defer eb.checkpoint.Shutdown() if eb.config.ShutdownTimeout > 0 { logp.Info("Shutdown will wait max %v for the remaining %v events to publish.", eb.config.ShutdownTimeout, acker.Active()) ctx, cancel := context.WithTimeout(context.Background(), eb.config.ShutdownTimeout) defer cancel() acker.Wait(ctx) } return nil }beats/winlogbeat/beater/winlogbeat.go
func (eb *Winlogbeat) processEventLog( wg *sync.WaitGroup, logger *eventLogger, state checkpoint.EventLogState, acker *eventACKer, ) { defer wg.Done() logger.run(eb.done, eb.pipeline, state, acker) }连接获取日志并Publish winlogbeat/beater/eventlogger.go
func (e *eventLogger) run( done <-chan struct{}, pipeline beat.Pipeline, state checkpoint.EventLogState, acker *eventACKer, ) { api := e.source // Initialize per event log metrics. initMetrics(api.Name()) client, err := e.connect(pipeline) if err != nil { logp.Warn("EventLog[%s] Pipeline error. Failed to connect to publisher pipeline", api.Name()) return } // close client on function return or when `done` is triggered (unblock client) defer client.Close() go func() { <-done client.Close() }() err = api.Open(state) if err != nil { logp.Warn("EventLog[%s] Open() error. No events will be read from "+ "this source. %v", api.Name(), err) return } defer func() { logp.Info("EventLog[%s] Stop processing.", api.Name()) if err := api.Close(); err != nil { logp.Warn("EventLog[%s] Close() error. %v", api.Name(), err) return } }() debugf("EventLog[%s] opened successfully", api.Name()) for stop := false; !stop; { select { case <-done: return default: } // Read from the event. records, err := api.Read() //读取事件日志 beats/winlogbeat/eventlog/eventlog.go switch err { case nil: case io.EOF: // Graceful stop. stop = true default: logp.Warn("EventLog[%s] Read() error: %v", api.Name(), err) return } debugf("EventLog[%s] Read() returned %d records", api.Name(), len(records)) if len(records) == 0 { time.Sleep(time.Second) continue } acker.Add(len(records)) for _, lr := range records { client.Publish(lr.ToEvent()) //push数据 } } }beats/libbeat/publisher/pipeline/client.go
func (c *client) Publish(e beat.Event) { c.mutex.Lock() defer c.mutex.Unlock() c.publish(e) } func (c *client) publish(e beat.Event) { var ( event = &e publish = true log = c.pipeline.monitors.Logger ) c.onNewEvent() if !c.isOpen.Load() { // client is closing down -> report event as dropped and return c.onDroppedOnPublish(e) return } if c.processors != nil { var err error event, err = c.processors.Run(event) publish = event != nil if err != nil { // TODO: introduce dead-letter queue? log.Errorf("Failed to publish event: %v", err) } } if event != nil { e = *event } open := c.acker.addEvent(e, publish) if !open { // client is closing down -> report event as dropped and return c.onDroppedOnPublish(e) return } if !publish { c.onFilteredOut(e) return } e = *event pubEvent := publisher.Event{ Content: e, Flags: c.eventFlags, } if c.reportEvents { c.pipeline.waitCloser.inc() } var published bool if c.canDrop { published = c.producer.TryPublish(pubEvent) } else { published = c.producer.Publish(pubEvent) } if published { c.onPublished() } else { c.onDroppedOnPublish(e) if c.reportEvents { c.pipeline.waitCloser.dec(1) } } }