Создание телеграмм-бота (Spring Boot, Kafka, PostgreSQL), часть первая

Kate

Administrator
Команда форума

Рецепт по приготовлению своего «Telegram-Франкенштейна»​

Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. - «Франкенштейн» Мэри Шелли
Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. - «Франкенштейн» Мэри Шелли
Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.

Статьи будут разделены на 2 части, первая часть - создание основного бота с оправкой логов (Kafka Producer) и записью их в БД, вторая часть - обработка всех логов (Kafka Consumer).

Ингредиенты:​

  1. Регистрация бота
  2. Создание Spring Boot проект, проще всего это сделать через встроенный конфигуратор в IntelliJ IDEA, либо используя Spring Initializr. (в качестве системы сборки будет использоваться Gradle)
  3. Kafka (для отслеживания топиков я использую Conductor)
  4. PostgreSQL (для комфортной работы я использую DBeaver)
Если возникнут сложности с воссозданием туториала
Прошу пишите в коментариях возникшие проблемы, на всякий случай - вот мой git

Начинаем с нарезки:​

Первостепенно нужно настроить build.grable со всеми зависимостями

build.grable
buildscript {
repositories {
mavenCentral()
}
}

plugins {
id 'org.springframework.boot' version '2.4.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}

apply from: 'build-test.gradle'

group 'com.sercetary.bot'
sourceCompatibility = '14'

configurations {
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
}

configurations.all {
exclude module: 'slf4j-log4j12'
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
implementation 'org.springframework.data:spring-data-commons:2.6.0'
implementation 'org.springframework.kafka:spring-kafka:2.7.6'
implementation 'org.postgresql:postgresql:42.3.1'
implementation 'com.h2database:h2:1.4.200'

implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'
implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'

compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'
compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'
}
Далее сразу для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka producer

application.yml
server:
port: 9000
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Теперь настройки application.properties

application.properties
# HTTP port for incoming requests
server.port=8081

app.http.bot=change-me
telegram-bot.name=change-me
telegram-bot.token=change-me

# Bot db
app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me
app.db.bot-db.driver=org.postgresql.Driver
app.db.bot-db.user=change-me
app.db.bot-db.password=change-me
app.db.bot-db.pool-size=10

# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me
Хорошо, после настроек нашего проекта, давайте обговорим его структуру:

Структура проекта
Структура проекта
Пакеты:

  • config - описание бинов и конфигурации проекта
  • controller - обрабатывает запрос пользователя
  • dto - хранит данные, а так же описывает модель таблицы БД
  • exceptions - кастомный пакет обработчика ошибок
  • repository - логика работа с БД
  • service - основная бизнес логика проекта

Сейчас мы собираем игредиенты и маринуем:​

Настройки бинов:​

- Первым делом прописываем конфигурация бинов нашего приложения в пакете config, тут настройки инициализации TelegramBotsApi и ObjectMapper

AppConfig
@Configuration
public class AppConfig {

@Bean
ObjectMapper customObjectMapper() {
return new ObjectMapper();
}

@Bean
TelegramBotsApi telegramBotsApi() throws TelegramApiException{
return new TelegramBotsApi(DefaultBotSession.class);
}
}
- Внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc

DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {

@Bean
@Qualifier("bot-db")
@ConfigurationProperties(prefix = "app.db.bot-db")
SpringDataJdbcProperties gitlabJdbcProperties() {
return new SpringDataJdbcProperties();
}

@Bean
@Qualifier("bot-db")
public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {
return hikariDataSource("db", properties);
}

@Bean
@Qualifier("bot-db")
JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}

@Data
@NoArgsConstructor
public static class SpringDataJdbcProperties {

// constants
private static final String H2_DATABASE_DRIVER = "org.h2.Driver";

/**
* JDBC URL property
*/
String url;
/**
* JDBC driver class name property
*/
String driver;
/**
* JDBC username property
*/
String user;
/**
* JDBC password property
*/
String password;
/**
* Hikari / Vertica maxPoolSize property
*/
String poolSize;
/**
* Minimum pool size
*/
int minPoolSize = 4;
/**
* Maximum pool size
*/
int maxPoolSize = 10;
/**
* This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
* sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
*/
long idleTimeout;
/**
* This property controls the maximum lifetime of a connection in the pool. When a connection
* reaches this timeout, even if recently used, it will be retired from the pool.
* An in-use connection will never be retired, only when it is idle will it be removed
*/
long maxLifetime;
/**
* Bulk insert size
*/
Integer bulkSize;


/**
* All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
*
* @param url JDBC driver class name property
* @param driver JDBC driver class name property
* @param user JDBC username property
* @param password JDBC password property
* @param poolSize Hikari / Vertica maxPoolSize property
* @param bulkSize bulk insert size
*/
public SpringDataJdbcProperties(
String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
this.url = url;
this.driver = driver;
this.user = user;
this.password = password;
this.poolSize = poolSize;
this.bulkSize = bulkSize;
}


/**
* Возвращает истину, если экземпляр описывает in-memory H2 database
*
* @return истина, если экземпляр описывает in-memory H2 database
*/
public boolean isH2Database() {
return driver.equals(H2_DATABASE_DRIVER);
}

/**
* Возвращает строковое представление экземпляра объекта в формате JSON
*
* @return строковое представление экземпляра объекта в формате JSON
*/
@Override
public String toString() {
var props = new SpringDataJdbcProperties(
url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
return Json.encode(props);
}

}

}
- Создадим базовый класс для уменьшения дублирования кода инициализации бинов

DefaultDbConfig
@Slf4j
class DefaultDbConfig {

protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
log.info("[{}] настройки БД: [{}]", tag, properties.toString());

HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(properties.getUrl());
ds.setDriverClassName(properties.getDriver());
ds.setUsername(properties.getUser());
ds.setPassword(properties.getPassword());
ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
return ds;
}
}
- После напишем утилитный класс для логирования

Json
public class Json {
static final ObjectMapper mapper = new ObjectMapper();

/**
* Encode instance as JSON
*
* @param obj instance
* @return JSON
*/
public static String encode(Object obj) {
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return obj.toString();
}
}

public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {
return mapper.readValue(json, clazz);
}

Далее мы напишем контроллер, для доступа к сервису из вне​

- Создаем простенький контроллер, для получения списка записей из БД

UsersController
@Slf4j
@RestController
@RequestMapping("${app.http.bot")
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class UsersController {

private final UserService userService;

/**
* Возвращает список пользователей и связанных с ними планами
*/
@RequestMapping(path = "/users_idea", method = RequestMethod.GET)
public List<User> getIdeaList() {
log.debug("Method - getIdeaList was called");
return userService.getUserList();
}
}

После переходим к созданию модели​

- Создаем модель пользователя User, а так же его маппер UserMapper, который понадобиться для работы с БД и маппинга полей в таблице

User
@Data
@RequiredArgsConstructor
public class User {
/**
* user's id
*/
@JsonProperty("id")
private final int id;
/**
* user's name
*/
@JsonProperty("name")
private final String name;
/**
* description
*/
@JsonProperty("description")
private final String description;

private String startWord = "";

@Override
public String toString() { return startWord + description; }
}
UserMapper
@Slf4j
public class UserMapper implements RowMapper<User> {

@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
var entity = new User(
rs.getInt("id"),
rs.getString("user_name"),
rs.getString("description")
);
log.trace("mapRow(): entity = [{}]", entity);
return entity;
}
}

Переходим к созданию кастомных exception​

Для чего они нужны
Их мы используем для обработки ошибок, которые могут произойти в процессе работы приложения, чтобы бот не сломался и продолжил свою работу.
- BaseException - класс, который наследуется от RuntimeException, в конструкторе принимает 2 параметра - сообщение и тело ошибки

BaseException
@Slf4j
public class BaseException extends RuntimeException{

public BaseException(String msg, Throwable t) {
super(msg, t);
log.error(msg, t);
}

public BaseException(String msg) {
super(msg);
log.error(msg);
}

}
- NotFoundException - класс, который вывзывается, когда ответ не найден, наследуется от BaseException

NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends BaseException {

private final static String MESSAGE = "Not Found";

public NotFoundException(Throwable t) {
super(MESSAGE, t);
}

public NotFoundException() {
super(MESSAGE);
}
}
- DbException - класс, который обрабатыевает ошибки связанные с БД, наследуется от RuntimeException

DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class DbException extends RuntimeException {

private static final String MESSAGE = "Ошибка БД";

public DbException(String message) {
super(message);
}

public DbException(Throwable cause) {
super(MESSAGE, cause);
}
}

Теперь для работы с БД, создаем repository​

- Создадим интерфейс, который описывает методы, для работы с записями в БД

IUserRepository
public interface IUserRepository {

/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
User getById(int id);

/**
* Возвращает список записей
*
* @return список всех записей
* @throws DbException в случае ошибки БД
*/
List<User> getUserList();

/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
void insert(User entity);

/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
void delete(User entity);
}
- Теперь напишем класс, который реализует методы интерфейса

UserRepository
@Slf4j
@Repository
public class UserRepository implements IUserRepository {

// constants
private static final String SQL_SELECT_BY_NAME = "" +
"SELECT id, user_name, description FROM user_table WHERE id=?";
private static final String SQL_SELECT_LIST = "" +
"SELECT id, user_name, description FROM user_table";
private static final String SQL_INSERT = "" +
"INSERT INTO user_table (user_name, description) VALUES (?, ?)";
private static final String SQL_DELETE = "" +
"DELETE FROM user_table WHERE id = ?";

protected final static UserMapper USER_MAPPER = new UserMapper();

// beans
protected final JdbcTemplate template;


/**
* Req-args constructor for Spring DI
*/
public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {
this.template = template;
}

/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public User getById(int id) throws DbException {
try {
return DataAccessUtils.singleResult(
template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}

/**
* Возвращает список записей
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public List<User> getUserList() throws DbException {
try {
return template.query(SQL_SELECT_LIST, USER_MAPPER);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}

/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void insert(User entity) throws DbException {
try {
// В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом
var result = template.update(SQL_INSERT,
entity.getName(),
entity.getDescription());
if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);
log.info("insert({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}

/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void delete(User entity) throws DbException {
try {
var result = template.update(SQL_DELETE, entity.getId());
if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);
log.info("delete({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
}
- Далее у нас идет логика бота, тут все тривиально, в отнаследованном onUpdateReceived методе от класса родителя TelegramLongPollingBot мы пишем поведение, которое происходит при обновлении чата с пользователем, подробнее об этом здесь, так же в методе обработки сообщений есть вызов нашего producer и запись данных в БД

TelegramBot
@Slf4j
@Getter
@Component
public class TelegramBot extends TelegramLongPollingBot {

private Message requestMessage = new Message();
private final SendMessage response = new SendMessage();
private final Producer producerService;
private final UserService userService;

private final String botUsername;
private final String botToken;

public TelegramBot(
TelegramBotsApi telegramBotsApi,
@Value("${telegram-bot.name}") String botUsername,
@Value("${telegram-bot.token}") String botToken,
Producer producerService, UserService userService) throws TelegramApiException {
this.botUsername = botUsername;
this.botToken = botToken;
this.producerService = producerService;
this.userService = userService;

telegramBotsApi.registerBot(this);
}

/**
* Этот метод вызывается при получении обновлений через метод GetUpdates.
*
* @param request Получено обновление
*/
@SneakyThrows
@Override
public void onUpdateReceived(Update request) {
requestMessage = request.getMessage();
response.setChatId(requestMessage.getChatId().toString());

var entity = new User(
0, requestMessage.getChat().getUserName(),
requestMessage.getText());

if (request.hasMessage() && requestMessage.hasText())
log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());

if (requestMessage.getText().equals("/start"))
defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");
else if (requestMessage.getText().equals("/idea"))
onIdea(response);
else
defaultMsg(response, "Я записал вашу мысль :) \n ");

log.info("Working, text[{}]", requestMessage.getText());

if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}
}

/**
* Метод отправки сообщения со списком мыслей - по команде "/idea"
*
* @param response - метод обработки сообщения
*/
private void onIdea(SendMessage response) throws TelegramApiException {
if (userService.getUserList().isEmpty()) {
defaultMsg(response, "В списке нет мыслей. \n");
} else {
defaultMsg(response, "Вот список ваших мыслей: \n");
for (User txt : userService.getUserList()) {
response.setText(txt.toString());
execute(response);
}
}
}

/**
* Шабонный метод отправки сообщения пользователю
*
* @param response - метод обработки сообщения
* @param msg - сообщение
*/
private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {
response.setText(msg);
execute(response);
}
}
Фрагмент кода с отправкой в Kafka и записью в БД
if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}

Переходим к созданию бизнес логики приложения​

- BaseService - реализует базовые методы сервисов проекта

BaseService
public class BaseService {

/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null
*/
public <T> T wrapResult(T result) {
if(result == null)
throw new NotFoundException();
return result;
}

/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null или пустой
*/
public <T> List<T> wrapResults(List<T> result) {
if(result == null || result.size() == 0)
throw new NotFoundException();
return result;
}

}
- Класс UserService работает с нашим репозиторием IUserRepository и содержит в себе бизнес-логику работы с записями о событиях в БД

UserService
@Service
@Slf4j
@RequiredArgsConstructor
public class UserService extends BaseService {

//beans
protected final IUserRepository repo;

/**
* Возвращает список записей
*
* @return список записей
* @throws DbException в случае ошибки БД
*/
public List<User> getUserList() {
log.trace("#### getUserList() - working");
return wrapResults(repo.getUserList());
}

/**
* Возвращает список записей по id
*
* @throws DbException в случае ошибки БД
*/
public User getById(int id) {
log.trace("#### getById() [id={}]", id);
return wrapResult(repo.getById(id));
}

/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
public void insert(User entity) {
log.trace("#### insert() [entity={}]", entity);
repo.insert(entity);
}

/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
public void delete(User entity) {
log.trace("#### delete() [entity={}]", entity);
repo.delete(entity);
}

}
- Класс Producer, как раз тот класс, который шлет сообщения в топик users, а так же здесь мы можем изменять формат самого сообщения и данные, которые он отправляет

Producer
@Service
@Slf4j
public class Producer {

private static final String TOPIC = "users";
protected final IUserRepository repo;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public Producer(IUserRepository repo) {
this.repo = repo;
}

public void sendMessage(User user) {
if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");
log.info("#### Producing message ", user);
kafkaTemplate.send(TOPIC, "Writing in log -> " + user);
}
}
[HEADING=3]В конце класс, который собственно и запускает все наше приложене[/HEADING]
WebHookApp
@Slf4j
@SpringBootApplication
public class WebHookApp {
public static void main(String[] args) {
SpringApplication.run(WebHookApp.class, args);
}
}
[HEADING=1]Теперь мы замариновали все ингридиенты и подготовили блюдо к запеканию:[/HEADING]
- Сначала проверим, запущена ли Kafka

[IMG alt="запуск по команде - sudo su systemctl start kafka"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/544/243/e5a/544243e5a3f499a48b5226a71b95636f.png[/IMG]запуск по команде - sudo su systemctl start kafka
- После, запускаем Conductor и видим, что у нас работет брокер сообщений, после запуска нашего приложения, тут появится топик users, в который будут лететь сообщения отправленные нашим producer

[IMG alt="Запущенный брокер"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/f77/787/9aa/f777879aad640f1fbcc378ba08c8eb1b.png[/IMG]Запущенный брокер
- Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:

CREATE TABLE public.log (
id serial4 NOT NULL,
message varchar(500) NOT NULL,
date_time date NOT NULL,
topic varchar(100) NOT NULL,
CONSTRAINT log_pkey PRIMARY KEY (id)
);
CREATE TABLE public.user_table (
id serial4 NOT NULL,
user_name varchar(100) NOT NULL,
description varchar(500) NULL,
CONSTRAINT user_table_pkey PRIMARY KEY (id)
);
[IMG alt="Схема БД public"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/ce5/f86/990/ce5f8699013d657c6c0d71528dbc1325.png[/IMG]Схема БД public[IMG alt="Вот как выглядят таблица log"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/a00/9a1/e31/a009a1e31faeebf410f8a7fe8c4d1e16.png[/IMG]Вот как выглядят таблица log[IMG alt="Вот как выглядит таблица user_table"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/157/fb1/7ba/157fb17ba71ee7cb677bb92981b988d6.png[/IMG]Вот как выглядит таблица user_table
[HEADING=1]Отлично, блюдо запеклось и готово к подаче:[/HEADING]
- Запускаем проект, проверяем, что все настроено и корректно работает

[IMG alt="Spring logs"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/147/bb1/e20/147bb1e20aae4d7917c0738559f1c417.png[/IMG]Spring logs
- Открываем телеграмм и пробуем на вкус нашего "Франкенштейна"

[LIST]
[*]Пишем - /start и начинаем тест ... Я в шоке, оно живое !
[/LIST]
[IMG alt="Общение с ботом в Телеграмм"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/bb8/b50/294/bb8b50294a0c8c63543ada506483a047.png[/IMG]Общение с ботом в Телеграмм
- Давайте посмотрим, что же нам написал Spring в логах и записались ли данные в Kafka и БД ?

Логи нашего бота, ошибок не наблюдается
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.2)
2022-01-15 16:46:19.248 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : The following profiles are active: bot
2022-01-15 16:46:19.291 WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath
2022-01-15 16:46:19.882 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http)
2022-01-15 16:46:19.887 INFO 412498 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-01-15 16:46:19.887 INFO 412498 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2022-01-15 16:46:19.956 INFO 412498 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-01-15 16:46:19.957 INFO 412498 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms
2022-01-15 16:46:20.013 INFO 412498 --- [ main] c.secretary.bot.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-15 16:46:20.565 INFO 412498 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-01-15 16:46:20.574 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-15 16:46:20.598 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-15 16:46:20.619 DEBUG 412498 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/**, /**] in 'resourceHandlerMapping'
2022-01-15 16:46:20.627 DEBUG 412498 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-15 16:46:20.702 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
2022-01-15 16:46:20.709 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : Started WebHookApp in 1.65 seconds (JVM running for 1.962)
SSS2022-01-15 16:52:33.916 INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-01-15 16:52:33.947 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642254753947
2022-01-15 16:52:34.056 INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw
2022-01-15 16:54:01.115 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2022-01-15 16:54:01.188 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
- Как мы видим, сообщения отправленные Боту появились в БД

[IMG alt="Записи в БД"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/dde/38f/1f7/dde38f1f7a55af7ce3ac7bb782bac34e.png[/IMG]Записи в БД
- Открыв кондуктор, перейдите во вкладку topics, после нажимаем на наш топик users

[IMG alt="Вкладка topics"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/719/3df/6b5/7193df6b5aec4ed9c630730399277921.png[/IMG]Вкладка topics
- Далее во вкладке нашего топика нажимаем на кнопку CONSUME DATA

[IMG alt="Информация о топике users"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/e0b/3f9/a92/e0b3f9a926feee20e359140ebf7066b4.png[/IMG]Информация о топике users
- В открывшемся окне, ставим такие же настройки (самая важная из них это Start From - указывает, с какого момента показывать сообщения в Kafka, наша настройка - показывает все сообщения, включая отправленые ранее)

[IMG alt="Настройки просмотра сообщений"]https://habrastorage.org/r/w1560/getpro/habr/upload_files/5cb/23f/5ed/5cb23f5ed3ec2686ce370f8d3a5481de.png[/IMG]Настройки просмотра сообщений
- Вот и все, теперь мы убедились, что сообщения благополучно прилетели в Kafka, записались в БД и не вызвали ошибок в приложении

[IMG alt="Прилетевшие в Kafka сообщения "]https://habrastorage.org/r/w1560/getpro/habr/upload_files/d54/a27/edd/d54a27eddab92677fdeedb5e9e316672.png[/IMG]

Прилетевшие в Kafka сообщения
[QUOTE]
Ну что же, большое всем спасибо за время, потраченное на прочтение данной статьи, жду вас во [URL='https://habr.com/ru/sandbox/165371/'][B]второй части[/B][/URL] этого туториала, где мы используем Consumer Kafka, с помощью которого будем обрабатывать прилетающие сообщения.
[/QUOTE]

https://habr.com/ru/post/655329/
 
Сверху