2. Elastic stack: анализ security логов. Logstash

Kate

Administrator
Команда форума
В прошлой статье мы познакомились со стеком ELK, из каких программных продуктов он состоит. И первая задача с которой сталкивается инженер при работе с ELK стеком это отправление логов для хранения в elasticsearch для последующего анализа. Однако, это просто лишь на словах, elasticsearch хранит логи в виде документов с определенными полями и значениями, а значит инженер должен используя различные инструменты распарсить сообщение, которое отправляется с конечных систем. Сделать это можно несколькими способами — самому написать программу, которая по API будет добавлять документы в базу либо использовать уже готовые решения. В рамках данного курса мы будем рассматривать решение Logstash, которое является частью ELK stack. Мы посмотрим как можно отправить логи с конечных систем в Logstash, а затем будем настраивать конфигурационный файл для парсинга и перенаправления в базу данных Elasticsearch. Для этого в качестве входящей системы берем логи с межсетевого экрана Check Point.
В рамках курса не рассматривается установка ELK stack, так как существует огромное количество статей на эту тему, будем рассматривать конфигурационную составляющую.

Составим план действий по конфигурации Logstash:

  1. Проверка что elasticsearch будет принимать логи (проверка работоспособности и открытости порта).
  2. Рассматриваем каким образом мы можем отправить события в Logstash, выбираем способ, реализуем.
  3. Настраиваем Input в конфигурационном файле Logstash.
  4. Настраиваем Output в конфигурационном файле Logstash в режиме дебага, для того чтобы понять как выглядит лог сообщение.
  5. Настраиваем Filter.
  6. Настраиваем корректный Output в ElasticSearch.
  7. Запускам Logstash.
  8. Проверяем логи в Kibana.

Рассмотрим более детально каждый пункт:

Проверка что elasticsearch будет принимать логи​


Для этого можно проверить командой curl доступ в Elasticsearch из системы, на которой развернут Logstash. Если у вас настроена аутентификация, то также через curl передаем пользователь/пароль, указываем порт 9200 если вы его не меняли. Если приходит ответ наподобие как указано ниже, значит все в порядке.

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
"name" : "elastic-1",
"cluster_name" : "project",
"cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
"version" : {
"number" : "7.4.1",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
"build_date" : "2019-10-22T17:16:35.176724Z",
"build_snapshot" : false,
"lucene_version" : "8.2.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$


Если ответ не пришел, то может быть несколько видов ошибок: не работает процесс elasticsearch, указан не тот порт, либо порт заблокирован фаерволом на сервере, где стоит elasticsearch.

Рассматриваем как можно отправить логи в Logstash с межсетевого экрана check point​


С Check Point management server можно отправить логи в Logstash через syslog, используя утилиту log_exporter, более подробно можно ознакомиться с ней по данной статье, здесь оставим только команду которая создает поток:

cp_log_export add name check_point_syslog target-server <<ip_address_logstash>> target-port 5555 protocol tcp format generic read-mode semi-unified

<<ip_address_logstash>> — адрес сервера на котором крутится Logstash, target-port 5555 — порт на который будем отправлять логи, отправление логов по tcp может нагрузить сервер, поэтому в некоторых случаях более правильно ставить udp.

Настраиваем INPUT в конфигурационном файле Logstash​


kjnsqhfoxgr7fqtku2tpgibgh5y.png


По умолчанию конфигурационный файл находится в директории /etc/logstash/conf.d/. Конфигурационный файл состоит из 3 осмысленных частей: INPUT, FILTER, OUTPUT. В INPUT мы указываем откуда система будет брать логи, в FILTER парсим лог — настраиваем как поделить сообщение на поля и значения, в OUTPUT настраиваем выходящий поток — куда распарсенные логи будут отправляться.

Сначала настроим INPUT, рассмотрим некоторые типы которые могут быть — file, tcp и exe.

Tcp:

input {
tcp {
port => 5555
host => “10.10.1.205”
type => "checkpoint"
mode => "server"
}
}


mode => «server»
Говорит о том что Logstash принимает соединения.

port => 5555
host => “10.10.1.205”
Принимаем соединения по IP адресу 10.10.1.205 (Logstash), порт 5555 — порт должен быть разрешен политикой фаервола.

type => «checkpoint»
Маркируем документ, очень удобно в случае если у вас несколько входящих соединений. В последующем для каждого соединения можно написать свой фильтр с помощью логической конструкции if.

File:

input {
file {
path => "/var/log/openvas_report/*"
type => "openvas"
start_position => "beginning"
}
}


Описание настроек:
path => "/var/log/openvas_report/*"
Указываем директорию, файлы в которой необходимо считать.

type => «openvas»
Тип события.

start_position => «beginning»
При изменении файла считывает файл целиком, если выставить «end» то система ждет появления новых записей в конце файла.

Exec:

input {
exec {
command => "ls -alh"
interval => 30
}
}


По данному инпуту запускается (только!) shell команда и ее вывод оборачивается в лог сообщение.

command => «ls -alh»
Команда, вывод которой нас интересует.

interval => 30
Интервал вызова команды в секундах.

Для того чтобы принимать логи с межсетевого экрана прописываем фильтр tcp или udp, в зависимости от того как отправляются логи в Logstash.

Настраиваем Output в конфигурационном файле Logstash в режиме дебага, для того чтобы понять как выглядит лог сообщение​


После того как мы сконфигурировали INPUT, необходимо понять как будет выглядеть лог сообщение, какие методы необходимо использовать для настройки фильтра(парсера) логов.

Для этого будем использовать фильтр который выдает результат в stdout для того чтобы просмотреть исходное сообщение, полностью конфигурационный файл на текущий момент будет выглядеть так:

input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}

output
{
if [type] == "checkpoint"
{
stdout { codec=> json }
}
}


Запускам команду для проверки:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
Видим результат, картинка кликабельна:



Если скопировать это будет выглядеть так:

action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}\" origin=\"10.10.10.254\" originsicname=\"CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb\" sequencenum=\"8\" time=\"1576766483\" version=\"5\" context_num=\"1\" dst=\"10.10.10.10\" dst_machine_name=\"ts-spb-dc-01@tssolution.local\" layer_name=\"TSS-Standard Security\" layer_name=\"TSS-Standard Application\" layer_uuid=\"dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0\" layer_uuid=\"dbee3718-cf2f-4de0-8681-529cb75be9a6\" match_id=\"8\" match_id=\"33554431\" parent_rule=\"0\" parent_rule=\"0\" rule_action=\"Accept\" rule_action=\"Accept\" rule_name=\"Implicit Cleanup\" rule_uid=\"6dc2396f-9644-4546-8f32-95d98a3344e6\" product=\"VPN-1 & FireWall-1\" proto=\"17\" s_port=\"37317\" service=\"53\" service_id=\"domain-udp\" src=\"10.10.1.180\" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time=\"1576766483\" action=\"Accept\" conn_direction=\"Internal\" contextnum=\"1\" ifdir=\"outbound\" ifname=\"bond1.101\" logid=\"0\" loguid=\"{0x5dfb8c13,


Смотря на данные сообщения, понимаем что логи имеют вид: поле = значение или key = value, а значит подходит фильтр, который называется kv. Для того чтобы правильно выбрать фильтр к каждому конкретному случаю было бы неплохо с ними ознакомиться в технической документации, либо спросить друга.

Настраиваем Filter​


На прошлом этапе выбрали kv, далее представлена конфигурация этого фильтра:

filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}


Выбираем символ по которому будем делить поле и значение — “=”. Если у нас есть в логе одинаковые записи, сохраняем в базе только один экземпляр, иначе у вас получится массив из одинаковых значений, то есть если имеем сообщение “foo = some foo=some” записываем только foo = some.

Настраиваем корректный Output в ElasticSearch​


После того как настроен Filter можно выгружать логи в базу elasticsearch:

output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}


Если документ подписан типом checkpoint, сохраняем событие в базу elasticsearch, которая принимает соединения на 10.10.1.200 на порт 9200 по дефолту. Каждый документ сохраняется в определенный индекс, в данном случае сохраняем в индекс «checkpoint-» + текущая временная дата. Каждый индекс может обладать определенным набором полей, либо создается автоматически при появлении нового поля в сообщении, настройки полей и их тип можно посмотреть в mappings.

Если у вас настроена аутентификация ( рассмотрим позже), обязательно должны быть указаны креды для записи в конкретный индекс, в данном примере это «tssolution» с паролем «cool». Можно разграничить права пользователей для записи логов только в определенный индекс и никакие больше.

Запускаем Logstash.​


Конфигурационный файл Logstash:

input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}

filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}

output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}


Проверяем конфигурационный файл на правильность составления:
/usr/share/logstash/bin//logstash -f checkpoint.conf
lb1vw57djb7xm_lohxwbvzuygwy.png


Запускаем процесс Logstash:
sudo systemctl start logstash

Проверяем что процесс запустился:
sudo systemctl status logstash

nhugjwysxvwiras5r34w8ajqlxc.png


Проверяем поднялся ли сокет:
netstat -nat |grep 5555

n4xnjgwriabmksyhvdfbskgkeau.png


Проверяем логи в Kibana.​


После того как все запущено, заходим в Kibana — Discover, убеждаемся что все настроено правильно, картинка кликабельна!



Все логи на месте и мы можем видеть все поля и их значения!

Заключение​


Мы рассмотрели как писать конфигурационный файл Logstash, в результате получили парсер всех полей и значений. Теперь мы можем работать с поиском и составлением графиков по определенным полям. Далее в курсе мы рассмотрим визуализацию в Kibana, создадим простенький дашбоард. Стоит упомянуть что конфигурационный файл Logstash требуется постоянно дописывать в определенных ситуациях, например, когда хотим заменить значение поля с цифры на слово. В последующих статьях мы будем постоянно это делать.

Источник статьи: https://habr.com/ru/company/tssolution/blog/481960/
 
Сверху