Skip to content

Instantly share code, notes, and snippets.

@ruflin
Created January 18, 2016 15:31
Show Gist options
  • Select an option

  • Save ruflin/228e96bbd9c9e16bfdbe to your computer and use it in GitHub Desktop.

Select an option

Save ruflin/228e96bbd9c9e16bfdbe to your computer and use it in GitHub Desktop.
/*
Beat provides the basic environment for each beat.
Each beat implementation has to implement the beater interface.
# Start / Stop / Exit a Beat
A beat is start by calling the Run(name string, version string, bt Beater) function an passing the beater object.
This will create new beat and will Start the beat in its own go process. The Run function is blocked until
the Beat.exit channel is closed. This can be done through calling Beat.Exit(). This happens for example when CTRL-C
is pressed.
A beat can be stopped and started again through beat.Stop and beat.Start. When starting a beat again, it is important to
run it again in it's own go process. To allow a beat to be properly reastarted, it is important that Beater.Stop() properly
closes all channels and go processes.
In case a beat should not run as a long running process, the beater implementation must make sure to call Beat.Exit()
when the task is completed to stop the beat.
*/
package beat
import (
"flag"
"fmt"
"os"
"runtime"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/service"
"github.com/satori/go.uuid"
"sync"
)
// Beater interface that every beat must use
type Beater interface {
Config(*Beat) error
Setup(*Beat) error
Run(*Beat) error
Cleanup(*Beat) error
Stop()
}
// FlagsHandler (optional) Beater extension for
// handling flags input on startup. The HandleFlags callback will
// be called after parsing the command line arguments and handling
// the '--help' or '--version' flags.
type FlagsHandler interface {
HandleFlags(*Beat)
}
// Basic beat information
type Beat struct {
Name string
Version string
Config *BeatConfig
BT Beater
Events publisher.Client
UUID uuid.UUID
exit chan struct{}
}
// Basic configuration of every beat
type BeatConfig struct {
Output map[string]outputs.MothershipConfig
Logging logp.Logging
Shipper publisher.ShipperConfig
}
var printVersion *bool
// Channel that is closed as soon as the beat should exit
func init() {
printVersion = flag.Bool("version", false, "Print version and exit")
}
// Initiates a new beat object
func NewBeat(name string, version string, bt Beater) *Beat {
if version == "" {
version = defaultBeatVersion
}
b := Beat{
Version: version,
Name: name,
BT: bt,
UUID: uuid.NewV4(),
exit: make(chan struct{}),
}
return &b
}
// Initiates and runs a new beat object
func Run(name string, version string, bt Beater) {
b := NewBeat(name, version, bt)
// Runs beat inside a go process
go func() {
err := b.Start()
if err != nil {
// TODO: detect if logging was already fully setup or not
fmt.Printf("Start error: %v\n", err)
logp.Critical("Start error: %v", err)
os.Exit(1)
}
// If start finishes, exit has to be called. This requires start to be blocking
// which is currently the default.
b.Exit()
}()
// Waits until beats channel is closed
select {
case <-b.exit:
b.Stop()
logp.Info("Exit beat completed")
return
}
}
// Start starts the Beat by parsing and interpreting the command line flags,
// loading and parsing the configuration file, and running the Beat. This
// method blocks until the Beat exits. If an error occurs while initializing
// or running the Beat it will be returned.
func (b *Beat) Start() error {
// Additional command line args are used to overwrite config options
err, exit := b.CommandLineSetup()
if err != nil {
return err
}
if exit {
return nil
}
// Loads base config
err = b.LoadConfig()
if err != nil {
return err
}
// Configures beat
err = b.BT.Config(b)
if err != nil {
return err
}
// Run beat. This calls first beater.Setup,
// then beater.Run and beater.Cleanup in the end
return b.Run()
}
// Reads and parses the default command line params
// To set additional cmd line args use the beat.CmdLine type before calling the function
// The second return param is to detect if system should exit. True if should exit
// Exit can also be without error
func (beat *Beat) CommandLineSetup() (error, bool) {
// The -c flag is treated separately because it needs the Beat name
err := cfgfile.ChangeDefaultCfgfileFlag(beat.Name)
if err != nil {
return fmt.Errorf("failed to fix the -c flag: %v\n", err), true
}
flag.Parse()
if *printVersion {
fmt.Printf("%s version %s (%s)\n", beat.Name, beat.Version, runtime.GOARCH)
return nil, true
}
// if beater implements CLIFlags for additional CLI handling, call it now
if flagsHandler, ok := beat.BT.(FlagsHandler); ok {
flagsHandler.HandleFlags(beat)
}
return nil, false
}
// LoadConfig inits the config file and reads the default config information
// into Beat.Config. It exists the processes in case of errors.
func (b *Beat) LoadConfig() error {
err := cfgfile.Read(&b.Config, "")
if err != nil {
return fmt.Errorf("loading config file error: %v\n", err)
}
err = logp.Init(b.Name, &b.Config.Logging)
if err != nil {
return fmt.Errorf("error initializing logging: %v\n", err)
}
// Disable stderr logging if requested by cmdline flag
logp.SetStderr()
logp.Debug("beat", "Initializing output plugins")
pub, err := publisher.New(b.Name, b.Config.Output, b.Config.Shipper)
if err != nil {
return fmt.Errorf("error Initialising publisher: %v\n", err)
}
b.Events = pub.Client()
logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version)
return nil
}
// Run calls the beater Setup and Run methods. In case of errors
// during the setup phase, it exits the process.
func (b *Beat) Run() error {
// Setup beater object
err := b.BT.Setup(b)
if err != nil {
return fmt.Errorf("setup returned an error: %v", err)
}
// Up to here was the initialization, now about running
if cfgfile.IsTestConfig() {
logp.Info("Testing configuration file")
// all good, exit
return nil
}
service.BeforeRun()
// Callback is called if the processes is asked to stop.
// This needs to be called before the main loop is started so that
// it can register the signals that stop or query (on Windows) the loop.
service.HandleSignals(b.Exit)
logp.Info("%s sucessfully setup. Start running.", b.Name)
// Run beater specific stuff
err = b.BT.Run(b)
if err != nil {
logp.Critical("Running the beat returned an error: %v", err)
}
return err
}
// Stop calls the beater Stop action.
// It can happen that this function is called more then once.
func (b *Beat) Stop() {
logp.Info("Stopping Beat")
b.BT.Stop()
service.Cleanup()
logp.Info("Cleaning up %s before shutting down.", b.Name)
// Call beater cleanup function
err := b.BT.Cleanup(b)
if err != nil {
logp.Err("Cleanup returned an error: %v", err)
}
}
var callback sync.Once
// Exiting beat -> shutdown
func (b *Beat) Exit() {
callback.Do(func() {
logp.Info("Start exiting beat")
close(b.exit)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment