На последнем Zabbix Summit 2019 вместе с выходом Zabbix 4.4 был анонсирован новый Zabbix Agent 2, ключевая фишка которого — возможность написания плагинов к нему на языке Go. И многие сразу стали спрашивать: а как же, собственно, эти плагины писать, как они устроены? Где взять документацию и примеры?
В этой статье я хочу дать ответы на эти и некоторые другие вопросы. Обо всём по порядку, но если вы из тех, кто сразу рвётся в бой, смело пропускайте вступительную часть и переходите к практике ⎝◔◞ ◔⎠
Итак...
Плагин для первого агента мог запускаться в нескольких различных процессах, не давая создателю достаточного контроля над ним, чтобы реализовать, например, использование постоянных соединений, сохранение состояния между проверками, приём трапов — делать подобные вещи было либо сложно, либо вообще невозможно.
Новый агент имеет совершенно иную архитектуру. Он написан с нуля на языке Go (с некоторым переиспользованием Си кода из Zabbix Agent), что значительно упрощает написание плагинов по сравнению с созданием их на языке Си. С помощью Go-агент — предлагаю для простоты называть его так — можно решать описанные выше задачи простым и понятным образом. Несмотря на это, Go-агент обратно совместим с классическим Zabbix Agent на уровне протокола, конфигурации и метрик.
Новые возможности, появившиеся с Go-агентом:
Основные компоненты агента — это ServerConnector, ServerListener и Scheduler.
ServerConnector управляет коммуникацией с сервером (получение конфигурации/экспорт данных), конфигурацией items и кэшем исторических данных. Создаётся один коннектор на каждый активный сервер.
ServerListener принимает пассивные запросы и направляет их в Scheduler. Сейчас его функциональность ограничена только этим, но в будущем она может быть расширена.
Scheduler управляет очередью задач в соответствии с расписанием и настройками конкурентности. Агент запускает единственный Scheduler для управлением задачами (плагинами) в соответствии с расписанием, определяемым настройками item'ов.
Внутреннее устройство агента можно условно представить в разрезе двух типов проверок: активные и пассивные (вскоре ещё появятся Bulk Passive, но пока их нет). Тут важно понимать, что все они разделяют общие компоненты, а разделение на типы сделано только для упрощения восприятия.
Схемы ниже иллюстрируют взаимодействие компонентов для каждого типа. Прошу прощения за кривые картинки — не осилил PlantUML ¯\(ツ)/¯
Для каждого активного сервера создаётся пара: ServerConnector и ResultCache, каждый из которых запускается в своей горутине.
Классические пассивные проверки также используют Scheduler для управления задачами, но вместо ServerConnector'а используется ServerListener в качестве источника конфигурации item'ов. Результаты не кэшируются, а сразу отправляются в ResultWriter, отсылающий данные на сервер в ответ на запрос.
ExporterTask используется для активных проверок (и bulk пассивных проверок в будущем). Такая задача содержит item, который необходимо переодически опрашивать. Scheduler вызывает функцию Export интерфейса Exporter в отдельной горутине и записывает результат её выполнения в ResultWriter.
directExporterTask
directExporterTask используется для пассивных проверок и отличается от ExporterTask тем, что, в случае отсутствия результата опроса метрики (пустое значение), задача будет возвращена в очередь, и через 1 секунду планировщик попытается выполнить её повторно. Так будет повторяться до момента получения результата либо до наступления таймаута. Ещё одно отличие — directExporterTask не позволяет возвращать несколько значений.
watcherTask
WatcherTask содержит список метрик (запросов) для мониторинга. Планировщик вызывает функцию Watch интерфейса Watcher, передавая в качестве параметров список запросов.
collectorTask
Scheduler вызывает функцию Collect интерфейса Collector каждые Period() секунд.
starterTask
Планировщик вызывает функцию Start интерфейса Runner, когда плагин активирован.
stopperTask
Планировщик вызывает функцию Stop интерфейса Runner, когда плагин остановлен.
configuratorTask
Планировщик вызывает функцию Configure интерфейса Configurator, передавая ей в качестве параметров структуру с глобальными опциями агента и структуру с опциями, относящимися к конкретному плагину.
Exporter и Watcher определяют способ работы с данными: Exporter реализует pull модель, а Watcher — push.
Export(key string, params []string, context ContextProvider) (result interface{}, err error)
}
Exporter — это простейший интерфейс, который выполняет опрос и возвращает значение, несколько значений, ошибку или же не возвращает ничего. Он принимает предобработанный ключ, параметры ключа и контекст. Для большинства плагинов этого достаточно. Замечу, что это единственный интерфейс, позволяющий конкурентный доступ. Все остальные интерфейсы имеют эксклюзивный доступ, и ни один другой метод не может работать параллельно, пока плагин выполняет какую-либо задачу.
Будьте внимательны, если созданный плагин реализует конкурентный доступ к данным. В этом случае нужно самим обеспечить правильный доступ нескольких потоков к разделяемым данным. Для этого в вашем распоряжении есть весь арсенал языка Go: мьютексы, каналы, атомарные счетчики, sync.Map и другие примитивы синхронизации. Не забывайте использовать race-детектор, чтобы обнаружить возможные состояния гонки.
Существует лимит на количество конкурентных запросов функции Export() — максимум 100 запросов на плагин. При необходимости этот лимит можно уменьшать для каждого плагина в отдельности, используя функцию plugin.Base.SetCapacity.
func (b *Base) SetCapacity(capacity int)
Кроме того, capacity можно установить с помощью одноимённого параметра в конфигурационном файле. Например:
Plugins.<PluginName>.Capacity=1
Watch(requests []*Request, context ContextProvider)
}
Watcher позволяет плагину реализовать собственный процесс опроса метрик, не используя встроенный планировщик агента. Это может быть актуально для плагинов, использующих механизм trapping, которым нужен полный контроль над сбором и экспортом данных. Основной use case для интерфейса — ждать данные, и, по мере их поступления, отправлять результаты на сервер. Так, например, можно реализовать мониторинг логов или плагин, который подписывается на события от внешнего источника и ждёт, когда ему придут данные.
Collect() error
Period() int
}
Collector используется в случаях, когда плагину необходимо собирать данные через регулярные интервалы времени. Он не умеет возвращать данные самостоятельно, поэтому для возврата его нужно использовать в связке с Exporter’ом.
Типичный use case для Collector — частый сбор данных и помещение их в кэш, где записи будут храниться до момента, пока их не запросит Zabbix сервер.
У Collector’а есть 2 функции:
Start()
Stop()
}
Runner предоставляет средства для выполнения инициализации, когда плагин активирован (функция Start), и деинициализации, когда плагин не используется и остановлен (функция Stop).
Реализовав этот интерфейс, плагин может, к примеру, запускать или останавливать какой-либо фоновый поток, освобождать неиспользуемые ресурсы, закрывать соединения и т.д.
Активация и деактивация плагина происходит в зависимости от наличия или отсутствия запросов (метрик) для обработки. В случае активных проверок, когда обновляется конфигурация с сервера (Zabbix Server или Proxy), планировщик получает новые задачи. Как только появится первая задача, предназначенная на выполнение нашему плагину, он активируется. Остановка произойдет, когда в конфигурации больше не остается запросов к плагину. В случае пассивных проверок, плагин активируется в момент, когда приходит запрос от сервера, и останавливается через 24 часа после поступления последнего запроса.
Configure(globalOptions *GlobalOptions, privateOptions interface{})
Validate(privateOptions interface{}) error
}
Интерфейс Configurator нужен, чтобы предоставить возможность конфигурировать плагин.
Интерфейс имеет 2 функции:
Параметры конфигурации Go-агента по большей части совместимы с Zabbix агентом за несколькими исключениями.
Всё остальное, включая плагины, реализующие сбор стандартных метрик (таких как ЦПУ, сеть, диски, память и т.д.) — это внешние плагины. Созданные нами плагины будут работать наравне с ними и иметь такие же возможности. Располагаются внешние плагины в директории go/plugins, каждый в своём подкаталоге.
package packageName
import "zabbix.com/pkg/plugin"
type Plugin struct {
plugin.Base
}
var impl Plugin
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (res interface{}, err error) {
// Write your code here
return
}
func init() {
plugin.RegisterMetrics(&impl, "PluginName", "key", "Description.")
}
Выглядит не сложнее, чем скрипт на bash или python, правда? ⎝^ω^⎠ Осталось только добавить к нему код, который будет делать какую-то полезную работу.
Давайте немного попрактикуемся и попробуем написать плагин, возвращающий прогноз погоды для города, который мы передадим ему в качестве параметра ключа.
Для этого нам нужно сделать следующее:
Для начала скачаем исходные коды Zabbix.
$ git clone https://git.zabbix.com/scm/zbx/zabbix.git --depth 1 zabbix-agent2
$ cd zabbix-agent2
# Вы можете работать в master ветке, но я бы рекомендовал создать свою на основе одной из стабильных веток
$ git checkout -b feature/myplugin release/4.4
Создадим каталог src/go/plugins/weather и пустой файл weather.go в нём, который будет содержать наш код.
Далее, импортируем встроенный пакет "zabbix.com/pkg/plugin".
package weather
import "zabbix.com/pkg/plugin"
Определяем свою структуру, в которую встраиваем структуру Base из пакета plugin. Она понадобится нам в дальнейшем.
type Plugin struct {
plugin.Base
}
var impl Plugin
Теперь напишем код для получения и обработки данных. Всё, что нам нужно сделать, это:
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
if len(params) != 1 {
return nil, errors.New("Wrong parameters.")
}
// https://github.com/chubin/wttr.in
res, err := p.httpClient.Get(fmt.Sprintf("https://wttr.in/~%s?format=%%t", params[0]))
if err != nil {
return nil, err
}
temp, err := ioutil.ReadAll(res.Body)
_ = res.Body.Close()
if err != nil {
return nil, err
}
return string(temp)[0 : len(temp)-4], nil
}
Следующий шаг — зарегистрировать метрики, чтобы агент узнал о них и смаршрутизировал их обработку к нашему плагину.
Воспользуемся функцией plugin.RegisterMetrics.
// impl — указатель на реализацию плагина
// name — имя плагина
// params — список метрик и описаний для них (key1, descr1, key2, descr2, keyN, descrN...)
func RegisterMetrics(impl Accessor, name string, params ...string)
Вызовем её из функции init (это произойдёт сразу при старте агента).
func init() {
plugin.RegisterMetrics(&impl, "Weather", "weather.temp", "Returns Celsius temperature.")
}
Одним вызовом этой функции мы могли бы зарегистрировать сразу несколько метрик, если бы они у нас были.
package plugins
import (
_ "zabbix.com/plugins/kernel"
_ "zabbix.com/plugins/log"
// ...
_ "zabbix.com/plugins/weather"
)
Кстати, на данный момент поддерживаются 3 платформы: linux, darwin и windows. В будущем, этот список, вероятно, будет расширен.
И последнее: чтобы рассказать агенту о существовании нашего плагина и подключить его при компиляции, нужно включить его в список импортов в файлы src/go/plugins/plugins_<platform>.go.
Нам понадобится версия Go не ниже 1.13.
Чтобы собрать агент вместе с нашим плагином, нужно просто добавить опцию --enable-agent2 во время конфигурации и запустить make.
$ cd <zabbix-source>
$ ./bootstrap.sh; ./configure --enable-agent2 --enable-static; make
После окончания сборки можно запустить агент и проверить, как работает новый плагин. В данном случае мы передаем ему в качестве параметра “moscow” и получаем ответ.
$ <zabbix-source>/src/go/bin/zabbix_agent2 -t weather.temp[moscow]
+1
Собирать агент нужно только один раз. При дальнейшей разработке плагина, мы можем использовать команду go run, чтобы быстро проверить работу кода.
$ go run <zabbix-source>/src/go/cmd/zabbix_agent2/zabbix_agent2.go
Допустим, мы хотим, чтобы Timeout имел допустимый диапазон от 1 до 30 секунд, был опциональным и по умолчанию (если не задан) равнялся бы глобальному таймауту агента.
Определим структуру, описывающую нашу конфигурацию.
type PluginOptions struct {
// Timeout is the maximum time for waiting when a request has to be done. Default value equals the global timeout.
Timeout int `conf:"optional,range=1:30"`
}
Если вы обратили внимание, мы описали допустимый диапазон и признак опциональности параметра с помощью метаданных в тэге conf. Агент умеет использовать эти данные при чтении конфига.
Метаданные имеют такой формат: [name=<name>,][optional,][range=<range>,][default=<default value>], где:
type Plugin struct {
plugin.Base
options PluginOptions
httpClient http.Client
}
Реализуем интерфейс Configurator. Как мы помним, у него 2 метода: Configure и Validate.
func (p *Plugin) Configure(global *plugin.GlobalOptions, privateOptions interface{}) {
if err := conf.Unmarshal(privateOptions, &p.options); err != nil {
p.Errf("cannot unmarshal configuration options: %s", err)
}
// Set default value
if p.options.Timeout == 0 {
p.options.Timeout = global.Timeout
}
p.httpClient = http.Client{Timeout: time.Duration(p.options.Timeout) * time.Second}
}
func (p *Plugin) Validate(privateOptions interface{}) error {
// Nothing to validate
return nil
}
Вызовом функции conf.Unmarshal загружаем параметры плагина в заданную нами структуру.
Заменим вызов http.Get на p.httpClient.Get.
res, err := p.httpClient.Get(fmt.Sprintf("https://wttr.in/~%s?format=%%t", params[0]))
if err != nil {
if err.(*url.Error).Timeout() {
return nil, errors.New("Request timeout.")
}
return nil, err
}
Наконец, мы можем добавить наш параметр в конфигурационный файл агента:
Plugins.Weather.Timeout=1
Теперь если таймаут будет превышен, плагин должен выдать ошибку.
Но что, если мы введём какое-то недопустимое значение и запустим агент? Вы можете проверить — агент просто запустится и даже не выругается. Timeout будет установлен в default, т.е. будет равен глобальному таймауту.
Предупреждение в логе появится лишь в момент первого обращения к плагину (только тогда он активируется, и будут вызваны методы Validate и Configure).
Это не совсем то поведение, которое нам нужно. Пожалуй, было бы правильнее, если бы агент падал, в случае, когда конфиг некорректен. Для этого достаточно доработать метод Validate. Он вызывается при старте агента для всех плагинов, которые его реализуют.
func (p *Plugin) Validate(privateOptions interface{}) error {
var opts PluginOptions
return conf.Unmarshal(privateOptions, &opts)
}
Теперь если мы введём ошибочное значение параметра, то уже при запуске агента получим ошибку, подобную этой: "cannot create scheduling manager: invalid plugin Weather configuration: Cannot assign configuration: invalid parameter Plugins.Weather.Timeout at line 411: value out of range".
В следующих версиях агента будет добавлена возможность реконфигурации плагинов "на лету". При получении соответствующей runtime команды будут вызываться методы Validate и Configure, и плагин будет иметь возможность реагировать на них и обновлять свои настройки. Будьте внимательны, если вы создаёте какие-то горутины прямо из Configure — вы можете столкнуться с тем, что при реконфигурации будут запускаться всё новые и новые экземпляры этих горутин. Возможно, стоит вынести их запуск и остановку в методы Start и Stop (интерфейс Runner).
Полный исходный код плагина можно взять здесь: https://github.com/VadimIpatov/zabbix-weather-plugin.
Долой синтетические примеры! Напишем что-нибудь полезное. Пусть это будет плагин, поэтапно замеряющий время выполнения HTTP запроса и вычисляющий перцентили на основе собранной статистики.
Для начала реализуем метод сбора данных. Для этого воспользуемся пакетом "net/http/httptrace" (был представлен в Go 1.7).
type timeSample struct {
DnsLookup float64 `json:"dnsLookup"`
Connect float64 `json:"connect"`
TlsHandshake float64 `json:"tlsHandshake"`
FirstResponseByte float64 `json:"firstResponseByte"`
Rtt float64 `json:"rtt"`
}
func (p *Plugin) measureTime(url string) (timeSample, error) {
var (
sample timeSample
start, connect, dns, tlsHandshake time.Time
)
req, _ := http.NewRequest("GET", url, nil)
trace := &httptrace.ClientTrace{
DNSStart: func(_ httptrace.DNSStartInfo) {
dns = time.Now()
},
DNSDone: func(_ httptrace.DNSDoneInfo) {
sample.DnsLookup = float64(time.Since(dns) / time.Millisecond)
},
ConnectStart: func(_, _ string) {
connect = time.Now()
},
ConnectDone: func(net, addr string, err error) {
if err != nil {
p.Errf("unable to connect to host %s: %s", addr, err.Error())
}
sample.Connect = float64(time.Since(connect) / time.Millisecond)
},
GotFirstResponseByte: func() {
sample.FirstResponseByte = float64(time.Since(start) / time.Millisecond)
},
TLSHandshakeStart: func() {
tlsHandshake = time.Now()
},
TLSHandshakeDone: func(_ tls.ConnectionState, _ error) {
sample.TlsHandshake = float64(time.Since(tlsHandshake) / time.Millisecond)
},
}
ctx, cancel := context.WithTimeout(req.Context(), time.Duration(p.options.Timeout)*time.Second)
defer cancel()
req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
start = time.Now()
if _, err := http.DefaultTransport.RoundTrip(req); err != nil {
return timeSample{}, err
}
sample.Rtt = float64(time.Since(start) / time.Millisecond)
return sample, nil
}
Для вычисления перцентилей нам потребуется где-то хранить собранные данные. Для этой цели нам нужна циклическая очередь (Ring Buffer). Чтобы не усложнять наш пример, воспользуемся готовым решением — github.com/VadimIpatov/gcircularqueue. Это далеко не самая эффективная реализация, зато она позволит сохранить читаемость кода. Для вычисления перцентилей тоже воспользуемся силой Open Source и богатством экосистемы Go — я остановился на пакете github.com/montanaflynn/stats. Теперь мы можем описать структуры для хранения данных.
type Plugin struct {
plugin.Base
urls map[string]*urlUnit
sync.Mutex
options Options
}
type urlUnit struct {
url string
history *gcircularqueue.CircularQueue
accessed time.Time // last access time
modified time.Time // data collect time
}
Для инициализации и очистки ресурсов используем методы Start и Stop интерфейса Runner.
func (p *Plugin) Start() {
p.urls = make(map[string]*urlUnit)
}
func (p *Plugin) Stop() {
p.urls = nil
}
Сбор данных реализуем при помощи интерфейса Collector.
func (p *Plugin) Collect() (err error) {
now := time.Now()
p.Lock()
for key, url := range p.urls {
if now.Sub(url.accessed) > maxInactivityPeriod {
p.Debugf("removed expired url %s", url.url)
delete(p.urls, key)
continue
}
res, err := p.measureTime(url.url)
if err != nil {
p.Errf(err.Error())
continue
}
url.history.Push(res)
if url.history.IsFull() {
_ = url.history.Shift()
}
url.modified = now
}
p.Unlock()
return
}
func (p *Plugin) Period() int {
return p.options.Interval
}
Здесь мы в цикле бежим по списку URL (нам ещё предстоит его наполнить), для каждого из которых вызываем метод p.measureTime(url.url) и помещаем результат в буфер. Чтобы сделать точную привязку данных к времени, мы сохраняем время опроса в url.modified.
Так же мы удаляем те URL из списка, по которым давно не было обращений.
Как вы помните, Collector не умеет экспортировать данные. Нам нужен Exporter.
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
if len(params) != 1 {
return nil, errors.New("Wrong parameters.")
}
url, err := parseURL(params[0])
if err != nil {
return nil, err
}
switch key {
case keyHttpTraceStats:
if _, ok := p.urls; !ok { p.urls[url] = &urlUnit{ url...thub.com/VadimIpatov/zabbix-httptrace-plugin.
Использовать её очень просто:
$ zabbix_agent2 -R metrics
...
[Weather]
active: true
capacity: 0/100
tasks: 0
weather.temp: Returns Celsius temperature.
...
Эту информацию можно получить и другим способом — по HTTP. Для этого в конфиге агента нужно задать параметр StatusPort=, перезапустить агент и направить браузер на адрес http://<ZabbixAgentHost>:<Port>/status.
Templates guidelines — здесь мы собрали лучшие практики и свои рекомендации по разработке качественных шаблонов.
An official guide to making and managing great templates — презентация с последнего Zabbix Summit на эту же тему.
Magic of the new zabbix agent — презентация Zabbix Agent 2.
Официальная документация по Zabbix Agent 2.
Больше примеров кода вы найдёте в исходных кодах Zabbix Agent 2 (наш репозиторий тут: git.zabbix.com). Здесь можно посмотреть, как реализованы стандартные проверки.
Исходный код плагина Weather.
Исходный код плагина HttpTrace.
Writing watcher Zabbix Agent2 MQTT plugin in Go — отличный пример использования Watcher интерфейса.
Если вдруг вы ещё не знакомы с языком Go, обратите внимание на "Маленькую книгу о Go" и, конечно, пройдите официальный A Tour of Go ʕ☉Ѡ☉ʔ
В этой статье я хочу дать ответы на эти и некоторые другие вопросы. Обо всём по порядку, но если вы из тех, кто сразу рвётся в бой, смело пропускайте вступительную часть и переходите к практике ⎝◔◞ ◔⎠
Итак...
Что за новый агент, и зачем он появился?
Если вы пробовали писать плагины для первого Zabbix Agent, или хотя бы намеревались это сделать, то, наверняка, отметили, что ваши возможности весьма ограничены.Плагин для первого агента мог запускаться в нескольких различных процессах, не давая создателю достаточного контроля над ним, чтобы реализовать, например, использование постоянных соединений, сохранение состояния между проверками, приём трапов — делать подобные вещи было либо сложно, либо вообще невозможно.
Новый агент имеет совершенно иную архитектуру. Он написан с нуля на языке Go (с некоторым переиспользованием Си кода из Zabbix Agent), что значительно упрощает написание плагинов по сравнению с созданием их на языке Си. С помощью Go-агент — предлагаю для простоты называть его так — можно решать описанные выше задачи простым и понятным образом. Несмотря на это, Go-агент обратно совместим с классическим Zabbix Agent на уровне протокола, конфигурации и метрик.
Новые возможности, появившиеся с Go-агентом:
- конфигурация плагинов на уровне общего сonfig файла с агентом;
- конкурентное выполнение проверок "out-of-the-box";
- полный контроль над процессом сбора и экспорта данных;
- поддержка плагинов на Windows платформах;
- установка таймаутов для каждого плагина в отдельности.
Новый агент уже доступен в Zabbix 4.4 в качестве экспериментальной фичи, а с выходом Zabbix 5.0 он получит статус "production-ready".
Архитектура агента
Прежде чем написать первый плагин, давайте разберемся в общих чертах, как всё это устроено "под капотом". Сразу отмечу, что информация актуальна для Zabbix 4.4. Учитывая, что Go-агент пока имеет статус экспериментальной фичи, не исключаю, что к выходу Zabbix 5.0 что-то может поменяться.Основные компоненты агента — это ServerConnector, ServerListener и Scheduler.
ServerConnector управляет коммуникацией с сервером (получение конфигурации/экспорт данных), конфигурацией items и кэшем исторических данных. Создаётся один коннектор на каждый активный сервер.
ServerListener принимает пассивные запросы и направляет их в Scheduler. Сейчас его функциональность ограничена только этим, но в будущем она может быть расширена.
Scheduler управляет очередью задач в соответствии с расписанием и настройками конкурентности. Агент запускает единственный Scheduler для управлением задачами (плагинами) в соответствии с расписанием, определяемым настройками item'ов.
Внутреннее устройство агента можно условно представить в разрезе двух типов проверок: активные и пассивные (вскоре ещё появятся Bulk Passive, но пока их нет). Тут важно понимать, что все они разделяют общие компоненты, а разделение на типы сделано только для упрощения восприятия.
Схемы ниже иллюстрируют взаимодействие компонентов для каждого типа. Прошу прощения за кривые картинки — не осилил PlantUML ¯\(ツ)/¯
Активные проверки
Для каждого активного сервера создаётся пара: ServerConnector и ResultCache, каждый из которых запускается в своей горутине.
Пассивные проверки
Классические пассивные проверки также используют Scheduler для управления задачами, но вместо ServerConnector'а используется ServerListener в качестве источника конфигурации item'ов. Результаты не кэшируются, а сразу отправляются в ResultWriter, отсылающий данные на сервер в ответ на запрос.
Обработка конфигурации
Получив конфигурацию items от Zabbix сервера, ServerConnector обновляет данные у себя и создает updateRequest для каждого плагина, предоставляющего соответствующие метрики. Запросы через канал передаются планировщику, который создаёт задачи и помещает их в очередь. Таким образом, они выполнятся незамедлительно в тот момент, когда у плагина не останется никаких других задач.Планировщик и задачи
Взаимодействие агента с плагинами строится через двухуровневую очередь задач:- у каждого плагина есть очередь задач;
- у планировщика есть очередь активных плагинов.
За счёт этого достигается лучшая конкурентность. Когда задача не может быть выполнена из-за лимитов конкурентности, плагин извлекается из очереди планировщика (но сама задача остаётся в очереди плагина) и возвращается туда только тогда, когда следующая задача может быть выполнена.
- configuratorTask
- starterTask
- collectorTask
- watcherTask
- exporterTask (directExporterTask)
- stopperTask
Базовая задача (taskBase) содержит ссылку на плагин, запланированное время выполнения и другие служебные данные, отличающиеся в зависимости от типа задачи.
ExporterTask используется для активных проверок (и bulk пассивных проверок в будущем). Такая задача содержит item, который необходимо переодически опрашивать. Scheduler вызывает функцию Export интерфейса Exporter в отдельной горутине и записывает результат её выполнения в ResultWriter.
directExporterTask
directExporterTask используется для пассивных проверок и отличается от ExporterTask тем, что, в случае отсутствия результата опроса метрики (пустое значение), задача будет возвращена в очередь, и через 1 секунду планировщик попытается выполнить её повторно. Так будет повторяться до момента получения результата либо до наступления таймаута. Ещё одно отличие — directExporterTask не позволяет возвращать несколько значений.
watcherTask
WatcherTask содержит список метрик (запросов) для мониторинга. Планировщик вызывает функцию Watch интерфейса Watcher, передавая в качестве параметров список запросов.
collectorTask
Scheduler вызывает функцию Collect интерфейса Collector каждые Period() секунд.
starterTask
Планировщик вызывает функцию Start интерфейса Runner, когда плагин активирован.
stopperTask
Планировщик вызывает функцию Stop интерфейса Runner, когда плагин остановлен.
configuratorTask
Планировщик вызывает функцию Configure интерфейса Configurator, передавая ей в качестве параметров структуру с глобальными опциями агента и структуру с опциями, относящимися к конкретному плагину.
Интерфейсы
Всего доступно 5 интерфейсов: Exporter, Watcher, Collector, Runner и Configurator.Exporter и Watcher определяют способ работы с данными: Exporter реализует pull модель, а Watcher — push.
plugin.Exporter
type Exporter interface {Export(key string, params []string, context ContextProvider) (result interface{}, err error)
}
Exporter — это простейший интерфейс, который выполняет опрос и возвращает значение, несколько значений, ошибку или же не возвращает ничего. Он принимает предобработанный ключ, параметры ключа и контекст. Для большинства плагинов этого достаточно. Замечу, что это единственный интерфейс, позволяющий конкурентный доступ. Все остальные интерфейсы имеют эксклюзивный доступ, и ни один другой метод не может работать параллельно, пока плагин выполняет какую-либо задачу.
Будьте внимательны, если созданный плагин реализует конкурентный доступ к данным. В этом случае нужно самим обеспечить правильный доступ нескольких потоков к разделяемым данным. Для этого в вашем распоряжении есть весь арсенал языка Go: мьютексы, каналы, атомарные счетчики, sync.Map и другие примитивы синхронизации. Не забывайте использовать race-детектор, чтобы обнаружить возможные состояния гонки.
Существует лимит на количество конкурентных запросов функции Export() — максимум 100 запросов на плагин. При необходимости этот лимит можно уменьшать для каждого плагина в отдельности, используя функцию plugin.Base.SetCapacity.
func (b *Base) SetCapacity(capacity int)
Кроме того, capacity можно установить с помощью одноимённого параметра в конфигурационном файле. Например:
Plugins.<PluginName>.Capacity=1
plugin.Watcher
type Watcher interface {Watch(requests []*Request, context ContextProvider)
}
Watcher позволяет плагину реализовать собственный процесс опроса метрик, не используя встроенный планировщик агента. Это может быть актуально для плагинов, использующих механизм trapping, которым нужен полный контроль над сбором и экспортом данных. Основной use case для интерфейса — ждать данные, и, по мере их поступления, отправлять результаты на сервер. Так, например, можно реализовать мониторинг логов или плагин, который подписывается на события от внешнего источника и ждёт, когда ему придут данные.
plugin.Collector
type Collector interface {Collect() error
Period() int
}
Collector используется в случаях, когда плагину необходимо собирать данные через регулярные интервалы времени. Он не умеет возвращать данные самостоятельно, поэтому для возврата его нужно использовать в связке с Exporter’ом.
Типичный use case для Collector — частый сбор данных и помещение их в кэш, где записи будут храниться до момента, пока их не запросит Zabbix сервер.
У Collector’а есть 2 функции:
- Collect реализует непосредственно логику сбора;
- Period устанавливает нужный интервал сбора.
Collector, к примеру, используется для сбора данных по процессору и дискам в самом агенте.
plugin.Runner
type Runner interface {Start()
Stop()
}
Runner предоставляет средства для выполнения инициализации, когда плагин активирован (функция Start), и деинициализации, когда плагин не используется и остановлен (функция Stop).
Реализовав этот интерфейс, плагин может, к примеру, запускать или останавливать какой-либо фоновый поток, освобождать неиспользуемые ресурсы, закрывать соединения и т.д.
Активация и деактивация плагина происходит в зависимости от наличия или отсутствия запросов (метрик) для обработки. В случае активных проверок, когда обновляется конфигурация с сервера (Zabbix Server или Proxy), планировщик получает новые задачи. Как только появится первая задача, предназначенная на выполнение нашему плагину, он активируется. Остановка произойдет, когда в конфигурации больше не остается запросов к плагину. В случае пассивных проверок, плагин активируется в момент, когда приходит запрос от сервера, и останавливается через 24 часа после поступления последнего запроса.
plugin.Configurator
type Configurator interface {Configure(globalOptions *GlobalOptions, privateOptions interface{})
Validate(privateOptions interface{}) error
}
Интерфейс Configurator нужен, чтобы предоставить возможность конфигурировать плагин.
Интерфейс имеет 2 функции:
- Configure загружает конфигурационные параметры в заданную нами структуру.
- Validate проверяет config файл на корректность. Если проверка не прошла, то агент не запустится, и мы сразу получим сообщение о проблемах.
Параметры конфигурации Go-агента по большей части совместимы с Zabbix агентом за несколькими исключениями.
Типы плагинов
Плагины бывают внутренние и внешние. Плагины, которые экспортируют внутренние данные агента — это, соответственно, внутренние плагины. Они располагаются в пакете internal/agent и имеют префикс "plugin_" в названии. К примеру, так реализован плагин, отвечающий за работу с UserParameters.Всё остальное, включая плагины, реализующие сбор стандартных метрик (таких как ЦПУ, сеть, диски, память и т.д.) — это внешние плагины. Созданные нами плагины будут работать наравне с ними и иметь такие же возможности. Располагаются внешние плагины в директории go/plugins, каждый в своём подкаталоге.
Hello, world!
Плагин — это обычный пакет Go, в котором реализован один или несколько интерфейсов, определяющих логику его поведения. Пример простейшего плагина:package packageName
import "zabbix.com/pkg/plugin"
type Plugin struct {
plugin.Base
}
var impl Plugin
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (res interface{}, err error) {
// Write your code here
return
}
func init() {
plugin.RegisterMetrics(&impl, "PluginName", "key", "Description.")
}
Выглядит не сложнее, чем скрипт на bash или python, правда? ⎝^ω^⎠ Осталось только добавить к нему код, который будет делать какую-то полезную работу.
Давайте немного попрактикуемся и попробуем написать плагин, возвращающий прогноз погоды для города, который мы передадим ему в качестве параметра ключа.
Для этого нам нужно сделать следующее:
Для начала скачаем исходные коды Zabbix.
$ git clone https://git.zabbix.com/scm/zbx/zabbix.git --depth 1 zabbix-agent2
$ cd zabbix-agent2
# Вы можете работать в master ветке, но я бы рекомендовал создать свою на основе одной из стабильных веток
$ git checkout -b feature/myplugin release/4.4
Создадим каталог src/go/plugins/weather и пустой файл weather.go в нём, который будет содержать наш код.
Далее, импортируем встроенный пакет "zabbix.com/pkg/plugin".
package weather
import "zabbix.com/pkg/plugin"
Определяем свою структуру, в которую встраиваем структуру Base из пакета plugin. Она понадобится нам в дальнейшем.
type Plugin struct {
plugin.Base
}
var impl Plugin
Теперь напишем код для получения и обработки данных. Всё, что нам нужно сделать, это:
- Выполнить GET запрос к API сервиса погоды (спасибо, wttr.in)
- Прочитать результат
- Обработать ошибки
- Вернуть результат.
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
if len(params) != 1 {
return nil, errors.New("Wrong parameters.")
}
// https://github.com/chubin/wttr.in
res, err := p.httpClient.Get(fmt.Sprintf("https://wttr.in/~%s?format=%%t", params[0]))
if err != nil {
return nil, err
}
temp, err := ioutil.ReadAll(res.Body)
_ = res.Body.Close()
if err != nil {
return nil, err
}
return string(temp)[0 : len(temp)-4], nil
}
Следующий шаг — зарегистрировать метрики, чтобы агент узнал о них и смаршрутизировал их обработку к нашему плагину.
Воспользуемся функцией plugin.RegisterMetrics.
// impl — указатель на реализацию плагина
// name — имя плагина
// params — список метрик и описаний для них (key1, descr1, key2, descr2, keyN, descrN...)
func RegisterMetrics(impl Accessor, name string, params ...string)
Вызовем её из функции init (это произойдёт сразу при старте агента).
func init() {
plugin.RegisterMetrics(&impl, "Weather", "weather.temp", "Returns Celsius temperature.")
}
Одним вызовом этой функции мы могли бы зарегистрировать сразу несколько метрик, если бы они у нас были.
package plugins
import (
_ "zabbix.com/plugins/kernel"
_ "zabbix.com/plugins/log"
// ...
_ "zabbix.com/plugins/weather"
)
Кстати, на данный момент поддерживаются 3 платформы: linux, darwin и windows. В будущем, этот список, вероятно, будет расширен.
И последнее: чтобы рассказать агенту о существовании нашего плагина и подключить его при компиляции, нужно включить его в список импортов в файлы src/go/plugins/plugins_<platform>.go.
Сборка
Если у вас ещё не установлен Go, то сейчас самое время сделать это.Нам понадобится версия Go не ниже 1.13.
Чтобы собрать агент вместе с нашим плагином, нужно просто добавить опцию --enable-agent2 во время конфигурации и запустить make.
$ cd <zabbix-source>
$ ./bootstrap.sh; ./configure --enable-agent2 --enable-static; make
После окончания сборки можно запустить агент и проверить, как работает новый плагин. В данном случае мы передаем ему в качестве параметра “moscow” и получаем ответ.
$ <zabbix-source>/src/go/bin/zabbix_agent2 -t weather.temp[moscow]
+1
Собирать агент нужно только один раз. При дальнейшей разработке плагина, мы можем использовать команду go run, чтобы быстро проверить работу кода.
$ go run <zabbix-source>/src/go/cmd/zabbix_agent2/zabbix_agent2.go
Логирование
Если плагину требуется логирование, можно использовать функции из пакета zabbix.com/pkg/log: Tracef, Debugf, Warningf, Infof, Errf, Critf. Аналогичные функции содержит и наша структура Plugin, это обертки над функциями из пакета log. Разница лишь в том, что они добавляют префикс [<PluginName>] к сообщениям.Конфигурация плагина
Go-агент позволяет конфигурировать плагины на уровне конфигурационного файла агента. Для этого существует специальный параметр Plugins. По сути, он является не параметром вида ключ-значение, как все остальные параметры агента, а специальной секцией, где можно описывать специфичные параметры для каждого плагина в отдельности. В общем случае это выглядит так: Plugins.<PluginName>.<Parameter>=<Value>. Правила такие:- Желательно, чтобы имя плагина начиналось с заглавной буквы;
- Параметр должен начинаться с заглавной буквы;
- В именах параметров нельзя использовать спецсимволы;
- Уровень вложенности не ограничен;
- Параметров может быть сколь угодно много.
Допустим, мы хотим, чтобы Timeout имел допустимый диапазон от 1 до 30 секунд, был опциональным и по умолчанию (если не задан) равнялся бы глобальному таймауту агента.
Определим структуру, описывающую нашу конфигурацию.
type PluginOptions struct {
// Timeout is the maximum time for waiting when a request has to be done. Default value equals the global timeout.
Timeout int `conf:"optional,range=1:30"`
}
Если вы обратили внимание, мы описали допустимый диапазон и признак опциональности параметра с помощью метаданных в тэге conf. Агент умеет использовать эти данные при чтении конфига.
Метаданные имеют такой формат: [name=<name>,][optional,][range=<range>,][default=<default value>], где:
- <name> — имя параметра (если имя параметра в конфиге отличается от имени поля структуры);
- optional — установите, если параметр должен быть опциональным;
- <range> — допустимый диапазон <min>:<max>, где значения <min> и <max> опциональны;
- <default value> — значения параметры по умолчанию. Всегда должен быть указан последним.
type Plugin struct {
plugin.Base
options PluginOptions
httpClient http.Client
}
Реализуем интерфейс Configurator. Как мы помним, у него 2 метода: Configure и Validate.
func (p *Plugin) Configure(global *plugin.GlobalOptions, privateOptions interface{}) {
if err := conf.Unmarshal(privateOptions, &p.options); err != nil {
p.Errf("cannot unmarshal configuration options: %s", err)
}
// Set default value
if p.options.Timeout == 0 {
p.options.Timeout = global.Timeout
}
p.httpClient = http.Client{Timeout: time.Duration(p.options.Timeout) * time.Second}
}
func (p *Plugin) Validate(privateOptions interface{}) error {
// Nothing to validate
return nil
}
Вызовом функции conf.Unmarshal загружаем параметры плагина в заданную нами структуру.
Заменим вызов http.Get на p.httpClient.Get.
res, err := p.httpClient.Get(fmt.Sprintf("https://wttr.in/~%s?format=%%t", params[0]))
if err != nil {
if err.(*url.Error).Timeout() {
return nil, errors.New("Request timeout.")
}
return nil, err
}
Наконец, мы можем добавить наш параметр в конфигурационный файл агента:
Plugins.Weather.Timeout=1
Теперь если таймаут будет превышен, плагин должен выдать ошибку.
Но что, если мы введём какое-то недопустимое значение и запустим агент? Вы можете проверить — агент просто запустится и даже не выругается. Timeout будет установлен в default, т.е. будет равен глобальному таймауту.
Предупреждение в логе появится лишь в момент первого обращения к плагину (только тогда он активируется, и будут вызваны методы Validate и Configure).
Это не совсем то поведение, которое нам нужно. Пожалуй, было бы правильнее, если бы агент падал, в случае, когда конфиг некорректен. Для этого достаточно доработать метод Validate. Он вызывается при старте агента для всех плагинов, которые его реализуют.
func (p *Plugin) Validate(privateOptions interface{}) error {
var opts PluginOptions
return conf.Unmarshal(privateOptions, &opts)
}
Теперь если мы введём ошибочное значение параметра, то уже при запуске агента получим ошибку, подобную этой: "cannot create scheduling manager: invalid plugin Weather configuration: Cannot assign configuration: invalid parameter Plugins.Weather.Timeout at line 411: value out of range".
В следующих версиях агента будет добавлена возможность реконфигурации плагинов "на лету". При получении соответствующей runtime команды будут вызываться методы Validate и Configure, и плагин будет иметь возможность реагировать на них и обновлять свои настройки. Будьте внимательны, если вы создаёте какие-то горутины прямо из Configure — вы можете столкнуться с тем, что при реконфигурации будут запускаться всё новые и новые экземпляры этих горутин. Возможно, стоит вынести их запуск и остановку в методы Start и Stop (интерфейс Runner).
Полный исходный код плагина можно взять здесь: https://github.com/VadimIpatov/zabbix-weather-plugin.
Пример посложней
Мы разобрались как писать Exporter плагины. Это действительно очень просто. Давайте теперь попробуем реализовать плагин, использующий интерфейсы Collector и Runner.Долой синтетические примеры! Напишем что-нибудь полезное. Пусть это будет плагин, поэтапно замеряющий время выполнения HTTP запроса и вычисляющий перцентили на основе собранной статистики.
Для начала реализуем метод сбора данных. Для этого воспользуемся пакетом "net/http/httptrace" (был представлен в Go 1.7).
type timeSample struct {
DnsLookup float64 `json:"dnsLookup"`
Connect float64 `json:"connect"`
TlsHandshake float64 `json:"tlsHandshake"`
FirstResponseByte float64 `json:"firstResponseByte"`
Rtt float64 `json:"rtt"`
}
func (p *Plugin) measureTime(url string) (timeSample, error) {
var (
sample timeSample
start, connect, dns, tlsHandshake time.Time
)
req, _ := http.NewRequest("GET", url, nil)
trace := &httptrace.ClientTrace{
DNSStart: func(_ httptrace.DNSStartInfo) {
dns = time.Now()
},
DNSDone: func(_ httptrace.DNSDoneInfo) {
sample.DnsLookup = float64(time.Since(dns) / time.Millisecond)
},
ConnectStart: func(_, _ string) {
connect = time.Now()
},
ConnectDone: func(net, addr string, err error) {
if err != nil {
p.Errf("unable to connect to host %s: %s", addr, err.Error())
}
sample.Connect = float64(time.Since(connect) / time.Millisecond)
},
GotFirstResponseByte: func() {
sample.FirstResponseByte = float64(time.Since(start) / time.Millisecond)
},
TLSHandshakeStart: func() {
tlsHandshake = time.Now()
},
TLSHandshakeDone: func(_ tls.ConnectionState, _ error) {
sample.TlsHandshake = float64(time.Since(tlsHandshake) / time.Millisecond)
},
}
ctx, cancel := context.WithTimeout(req.Context(), time.Duration(p.options.Timeout)*time.Second)
defer cancel()
req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
start = time.Now()
if _, err := http.DefaultTransport.RoundTrip(req); err != nil {
return timeSample{}, err
}
sample.Rtt = float64(time.Since(start) / time.Millisecond)
return sample, nil
}
Для вычисления перцентилей нам потребуется где-то хранить собранные данные. Для этой цели нам нужна циклическая очередь (Ring Buffer). Чтобы не усложнять наш пример, воспользуемся готовым решением — github.com/VadimIpatov/gcircularqueue. Это далеко не самая эффективная реализация, зато она позволит сохранить читаемость кода. Для вычисления перцентилей тоже воспользуемся силой Open Source и богатством экосистемы Go — я остановился на пакете github.com/montanaflynn/stats. Теперь мы можем описать структуры для хранения данных.
type Plugin struct {
plugin.Base
urls map[string]*urlUnit
sync.Mutex
options Options
}
type urlUnit struct {
url string
history *gcircularqueue.CircularQueue
accessed time.Time // last access time
modified time.Time // data collect time
}
Для инициализации и очистки ресурсов используем методы Start и Stop интерфейса Runner.
func (p *Plugin) Start() {
p.urls = make(map[string]*urlUnit)
}
func (p *Plugin) Stop() {
p.urls = nil
}
Сбор данных реализуем при помощи интерфейса Collector.
func (p *Plugin) Collect() (err error) {
now := time.Now()
p.Lock()
for key, url := range p.urls {
if now.Sub(url.accessed) > maxInactivityPeriod {
p.Debugf("removed expired url %s", url.url)
delete(p.urls, key)
continue
}
res, err := p.measureTime(url.url)
if err != nil {
p.Errf(err.Error())
continue
}
url.history.Push(res)
if url.history.IsFull() {
_ = url.history.Shift()
}
url.modified = now
}
p.Unlock()
return
}
func (p *Plugin) Period() int {
return p.options.Interval
}
Здесь мы в цикле бежим по списку URL (нам ещё предстоит его наполнить), для каждого из которых вызываем метод p.measureTime(url.url) и помещаем результат в буфер. Чтобы сделать точную привязку данных к времени, мы сохраняем время опроса в url.modified.
Так же мы удаляем те URL из списка, по которым давно не было обращений.
Как вы помните, Collector не умеет экспортировать данные. Нам нужен Exporter.
func (p *Plugin) Export(key string, params []string, ctx plugin.ContextProvider) (result interface{}, err error) {
if len(params) != 1 {
return nil, errors.New("Wrong parameters.")
}
url, err := parseURL(params[0])
if err != nil {
return nil, err
}
switch key {
case keyHttpTraceStats:
if _, ok := p.urls; !ok { p.urls[url] = &urlUnit{ url...thub.com/VadimIpatov/zabbix-httptrace-plugin.
Мониторинг мониторинга
У агента есть runtime команда metrics, которая показывает состояние всех созданных плагинов и их текущую нагрузку. Иногда это может оказаться полезным.Использовать её очень просто:
$ zabbix_agent2 -R metrics
...
[Weather]
active: true
capacity: 0/100
tasks: 0
weather.temp: Returns Celsius temperature.
...
Эту информацию можно получить и другим способом — по HTTP. Для этого в конфиге агента нужно задать параметр StatusPort=, перезапустить агент и направить браузер на адрес http://<ZabbixAgentHost>:<Port>/status.
Что дальше?
А дальше мы планируем активно развивать агент. Расскажу немного о функционале, который может появиться в будущем:- Реализация загружаемых плагинов в виде динамических библиотек (чтобы не приходилось каждый раз пересобирать весь агент целиком).
- Обновление конфигурации в рантайме, т.е. без перезапуска агента.
- Больше полезных плагинов от команды Zabbix. Например, совсем скоро мы зарелизим мониторинг Docker’а и Mysql.
Мне мало!
Для тех, кто хочет глубже погрузиться в тему, я сделал подборку полезных ссылок:Templates guidelines — здесь мы собрали лучшие практики и свои рекомендации по разработке качественных шаблонов.
An official guide to making and managing great templates — презентация с последнего Zabbix Summit на эту же тему.
Magic of the new zabbix agent — презентация Zabbix Agent 2.
Официальная документация по Zabbix Agent 2.
Больше примеров кода вы найдёте в исходных кодах Zabbix Agent 2 (наш репозиторий тут: git.zabbix.com). Здесь можно посмотреть, как реализованы стандартные проверки.
Исходный код плагина Weather.
Исходный код плагина HttpTrace.
Writing watcher Zabbix Agent2 MQTT plugin in Go — отличный пример использования Watcher интерфейса.
Если вдруг вы ещё не знакомы с языком Go, обратите внимание на "Маленькую книгу о Go" и, конечно, пройдите официальный A Tour of Go ʕ☉Ѡ☉ʔ