Создание карты подключений Elasticsearch + Fluent Bit + Nginx Ingress Controller

Kate

Administrator
Команда форума

Разработка плана действий​

Передо мной стояла задача создать карту подключений пользователей по миру к нашим серверам для использовании статистических данных при оптимизации размещения новых мощностей и распределения общей нагрузки. Из инструментов сборов логов рассматривались Loki, Logstash, Fluent Bit. В итоге был выбран Fluent Bit из-за его относительно легкой настройки, оптимизации и наличия возможности написания собственных скриптов для агрегации на Lua. Получение геоданных из IP предполагается посредством использования баз данных для GeoIP2 от MaxMind.

Выявление ошибок и оптимизация​

Первоначально, сбор логов Nginx происходил в стандартном формате с помощью регулярного выражения. Позже, решено было изменить формат логов Nginx на JSON, изначально структурированный формат, что позволило собирать логи более точно.

Однако, спустя какое-то время была выявлена еще одна проблема, касающаяся данных в полях upstream_*. Так как вышестоящих серверов может быть несколько, Nginx складывает данные по каждому из них в соответствующие поля через запятую.

В ходе продумывания плана действий, обращение к базе данных GeoIP2 планировалось на стороне Fluent Bit через встроенный фильтр. В последствие было принято решение отказаться от данной идеи и перенести получение геоданных на сторону Elasticsearch, что позволило сэкономить трафик и оптимизировать агрегацию данных.

Установка и настройка Fluent Bit​

Установка Fluent Bit производилась с помощью Helm чарта. Содержимое файла values.yaml представлено ниже. Также был написан скрипт на Lua для разделения полей логов Nginx, идущих через запятую.

values.yaml
kind: DaemonSet

serviceAccount:
create: true

rbac:
create: true
nodeAccess: false
eventsAccess: false

podSecurityPolicy:
create: false

openShift:
enabled: false

hostNetwork: false

service:
type: ClusterIP
port: 2020

serviceMonitor:
enabled: true

dashboards:
enabled: false

resources:
limits:
cpu: 100m
memory: 200Mi
requests:
cpu: 50m
memory: 50Mi

flush: 1
logLevel: info
metricsPort: 2020

luaScripts:
field_filter.lua: |
<field_filter.lua>

config:
service: |
[SERVICE]
Daemon Off
Flush {{ .Values.flush }}
Log_Level {{ .Values.logLevel }}
Parsers_File /fluent-bit/etc/parsers.conf
Parsers_File /fluent-bit/etc/conf/custom_parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port {{ .Values.metricsPort }}
Health_Check On

inputs: |
[INPUT]
Name tail
Tag ingress-nginx-controller.*
Path /var/log/containers/*ingress-nginx-controller*.log
parser cri
DB /var/log/flb_ingress-nginx-controller.db
Mem_Buf_Limit 128MB
Skip_Long_Lines On

filters: |
[FILTER]
Name parser
Match ingress-nginx-controller.*
Key_Name message
Parser ingress_nginx_controller

[FILTER]
Name Lua
Match ingress-nginx-controller.*
type_array_key upstream_addr upstream_response_time upstream_response_length upstream_status
script /fluent-bit/scripts/field_filter.lua
call field_filter

outputs: |
[OUTPUT]
Name es
Match ingress-nginx-controller.*
Host elasticsearch-proxy.fluent-bit.svc.cluster.local
Port 9200
Index ingress-nginx-controller
HTTP_User <user>
HTTP_Passwd <password>
compress gzip
tls Off
tls.verify Off
Trace_Error On
Suppress_Type_Name On
Buffer_Size 2MB

customParsers: |
[PARSER]
Name ingress_nginx_controller
Format json
Time_Keep Off
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S%z
Скрипт для разделения полей на Lua. В зависимости от возвращаемого кода запись может быть изменена (код 2), отброшена (код -1) или остаться неизменной (код 0).
field_filter.lua
function field_filter(tag, timestamp, record)
code = 0
if record.x_forward_for == nil or record.server_addr == nil then
code = -1
return code, timestamp, record
end
function split_values(v, delimiter)
result = {}
for match in (v):gmatch("([^" .. delimiter .. "\\s][^\\" .. delimiter .. "]*[^" .. delimiter .. "\\s]*)") do
if match == "-" then
match = -1
end
table.insert(result, match)
end
return result
end
function process_value(v)
code = 0
if v ~= nil then
result = split_values(v, ", ")
code = 2
end
return code, result
end
code, record.upstream_addr = process_value(record.upstream_addr)
code, record.upstream_response_time = process_value(record.upstream_response_time)
code, record.upstream_response_length = process_value(record.upstream_response_length)
code, record.upstream_status = process_value(record.upstream_status)
return code, timestamp, record
end

Установка и настройка Elasticsearch​

Установка Elasticsearch была выполнена на виртуальной машине из официальных репозиториев. Пройдены стандартные процедуры по установке SSL сертификатов на Kibana, заведение системного пользователя для Fluent Bit. После запуска системы было произведено тестовое наполнение данными из Fluent Bit с автоматическим созданием индекса. Как и ожидалось, некоторые поля индекса имели тип Text и не соотвествовали типам реальных данных. После ручной правки типов данных индекс приобрел таковую структуру.
index_mappings.json
{
"mappings": {
"dynamic": "false",
"dynamic_templates": [],
"properties": {
"@timestamp": {
"type": "date"
},
"bytes_sent": {
"type": "long"
},
"client_geo_ip": {
"dynamic": "false",
"properties": {
"city_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"continent_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"country_iso_code": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"country_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"location": {
"type": "geo_point",
"ignore_malformed": false,
"ignore_z_value": true
},
"region_iso_code": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"region_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"duration": {
"type": "float"
},
"http_referrer": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"http_user_agent": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"method": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"path": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"remote_addr": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"remote_user": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"request_id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"request_length": {
"type": "long"
},
"request_proto": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"request_query": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"request_time": {
"type": "float"
},
"server_addr": {
"type": "ip",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"server_geo_ip": {
"dynamic": "false",
"properties": {
"city_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"continent_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"country_iso_code": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"country_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"location": {
"type": "geo_point"
},
"region_iso_code": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"region_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
},
"status": {
"type": "short",
"ignore_malformed": false,
"coerce": true
},
"upstream_addr": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"upstream_response_length": {
"type": "long",
"ignore_malformed": false,
"coerce": true
},
"upstream_response_time": {
"type": "float",
"ignore_malformed": false,
"coerce": true
},
"upstream_status": {
"type": "short",
"ignore_malformed": false,
"coerce": true
},
"vhost": {
"type": "text",
"fields": {
"keyword": {
"type": "wildcard",
"ignore_above": 256
}
}
},
"x_forward_for": {
"type": "ip",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
Перед попаданием в индекс, данные проходят дополнительную стадию обработки (Ingest Pipeline) для извлечения геоданных по адресам клиента и сервера из полей x_forward_for и server_addr по базе данных GeoIP2. Эта информация хранится объектах client_geo_ip и server_geo_ip соответственно.

Финальный результат​

После настройки всех необходимых систем и сбора некоторого количества данных были настроены визуализации и дешборды для более понятного представления общей картины. Одной из таких визуализация является тепловая карта подключений пользователей, представленная на рисунке ниже.
Тепловая карта подключений пользователей по миру

Тепловая карта подключений пользователей по миру

 
Сверху