Начало работы с MongoDB и Redis на Rust

Kate

Administrator
Команда форума
В этой статье будет показано как создать Rust бэкэнд, который использует MongoDB, документо-ориентированную БД, для хранения данных и Redis для кэширования, ограничения количества HTTP запросов и нотификаций пользователя. Для большей наглядности созданное приложение также будет предоставлять REST API. В итоге будет получена следующая архитектура:


architecture



MongoDB является хранилищем, в то время как Redis используется для следующего:


Обратите внимание, что указанные сценарии использования не означают, что для похожего сценария вам нужно использовать подход, описанный в статье. Примеры в первую очередь имеют целью познакомить вас с MongoDB и Redis.


Проект реализован с помощью MongoDB Rust driver и крейта redis-rs.


Вы сможете протестировать REST API приложения, поскольку оно развёрнуто на Google Cloud Platform.


Доменная модель включает данные о планетах Солнечной системы и их спутниках.


Запуск MongoDB и Redis​


Этот раздел не требует навыков программирования на Rust и может быть использован вне зависимости от языка программирования приложения.


Обе тулы могут быть запущены как Docker контейнер:


docker-compose.yml


version: '3.8'

services:

...

mongodb:
image: mongo
container_name: mongodb
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: $MONGODB_USERNAME
MONGO_INITDB_ROOT_PASSWORD: $MONGODB_PASSWORD
MONGO_INITDB_DATABASE: solar_system_info
ports:
- 27017:27017

mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json

redis:
image: redis:alpine
container_name: redis
ports:
- 6379:6379

Назначение контейнера mongodb_seed будет показано далее.


Вы можете получить доступ к mongo shell с помощью следующей команды:


docker exec -it mongodb mongo --username admin --password password


(где mongodb — название Docker контейнера, mongo — shell)


Теперь вы можете выполнять команды, например:


  • получить список баз данных с помощью show dbs
  • получить все данные в определённой базе данных:
    • use solar_system_info
    • show collections
    • db.planets.find()

Доступ к Redis CLI может быть получен с помощью следующей команды:


docker exec -it redis redis-cli


Простейший пример команды выглядит так:


Пример команды Redis


> set mykey somevalue
OK
> get mykey
"somevalue"

Для получения списка ключей используйте команду keys *.


Вы можете найти больше примеров команд для Redis CLI в этом гайде.


Инициализация данных​


MongoDB инициализируется данными в формате JSON с использованием контейнера mongodb_seed и команды mongoimport:


docker-compose.yml


mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json

Также инициализация БД может быть выполнена с использованием JavaScript файла.


Приложение также работает с изображениями планет. Первоначально я собирался хранить их в MongoDB; это может быть сделано с помощью следующего скрипта:


mongofiles --host mongodb --db solar_system_info --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD put /mongodb-init/images/*.jpg


Однако вскоре обнаружилось, что изображения не могут быть получены из БД из-за отсутствия поддержки GridFS в MongoDB Rust Driver (открытая задача). Поэтому для простоты используется крейт rust_embed, который позволяет включить изображения в бинарный исполняемый файл приложения во время компиляции (при разработке изображения загружаются из файловой системы). (Изображения также возможно хранить отдельно от приложения; папка images должна быть смонтирована как volume в определении Docker Compose сервиса)


Далее будет показано как использовать MongoDB и Redis в Rust приложении.


Имплементация приложения​


Зависимости​


Приложение имплементировано с помощью:



Cargo.toml


[package]
name = "mongodb-redis"
version = "0.1.0"
edition = "2018"

[dependencies]
mongodb = "2.0.0-beta.1"
redis = { version = "0.20.2", features = ["tokio-comp", "connection-manager"] }
actix-web = "4.0.0-beta.7"
tokio = "1.7.1"
tokio-stream = "0.1.6"
chrono = { version = "0.4.19", features = ["serde"] }
serde = "1.0.126"
serde_json = "1.0.64"
dotenv = "0.15.0"
derive_more = "0.99.14"
log = "0.4.14"
env_logger = "0.8.4"
rust-embed = "5.9.0"
mime = "0.3.16"

Структура проекта​


Структура проекта


├───images

├───mongodb-init
│ init.json

└───src
db.rs
dto.rs
errors.rs
handlers.rs
index.html
main.rs
model.rs
redis.rs
services.rs

Функция main​


Функция main


#[actix_web::main]
async fn main() -> std:🇮🇴:Result<()> {
dotenv::from_filename(".env.local").ok();
env_logger::init();

info!("Starting MongoDB & Redis demo server");

let mongodb_uri = env::var("MONGODB_URI").expect("MONGODB_URI env var should be specified");
let mongodb_client = MongoDbClient::new(mongodb_uri).await;

let redis_uri = env::var("REDIS_URI").expect("REDIS_URI env var should be specified");
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");

let planet_service = Arc::new(PlanetService::new(
mongodb_client,
redis_client,
redis_connection_manager.clone(),
));
let rate_limiting_service = Arc::new(RateLimitingService::new(redis_connection_manager));

...
}

Здесь определён кастомный MongoDbClient, клиент Redis и менеджер соединений Redis.


Работа с MongoDB​


Начнём с функции, возвращающей список планет, хранящихся в БД, и использующей асинхронный API:


Функция, возвращающая список планет


const DB_NAME: &str = "solar_system_info";
const COLLECTION_NAME: &str = "planets";

pub async fn get_planets(
&self,
planet_type: Option<PlanetType>,
) -> Result<Vec<Planet>, CustomError> {
let filter = planet_type.map(|pt| {
doc! { "type": pt.to_string() }
});

let mut planets = self.get_planets_collection().find(filter, None).await?;

let mut result: Vec<Planet> = Vec::new();
while let Some(planet) = planets.next().await {
result.push(planet?);
}

Ok(result)
}

fn get_planets_collection(&self) -> Collection<Planet> {
self.client
.database(DB_NAME)
.collection::<Planet>(COLLECTION_NAME)
}

get_planets также включает пример фильтрации документов MongoDB по определённому критерию.


Модель данных выглядит так:


Модель данных


#[derive(Serialize, Deserialize, Debug)]
pub struct Planet {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub name: String,
pub r#type: PlanetType,
pub mean_radius: f32,
pub satellites: Option<Vec<Satellite>>,
}

#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub enum PlanetType {
TerrestrialPlanet,
GasGiant,
IceGiant,
DwarfPlanet,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Satellite {
pub name: String,
pub first_spacecraft_landing_date: Option<mongodb::bson::DateTime>,
}

Структуры содержат поля "обычных" типов (string, f32), а также:


  • ObjectId (Planet.id)
  • список (Planet.satellites)
  • дата/timestamp (Satellite.first_spacecraft_landing_date)
  • перечисление (Planet.type)
  • nullable поля (Planet.id, Planet.satellites)

Проект также включает примеры получения, создания, обновления и удаления MongoDB документов. Я не буду подробно останавливаться на этих функциях ввиду очевидности кода их имплементации. Вы можете протестировать эти функции используя REST API:



MongoDB документы хранятся в формате BSON.


Работа с Redis​


Redis клиент создаётся следующим образом:


Создание Redis клиента


pub async fn create_client(redis_uri: String) -> Result<Client, RedisError> {
Ok(Client::eek:pen(redis_uri)?)
}

Менеджер соединений Redis может быть создан так:


Получение менеджера соединений Redis


let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");

Кэширование​


Рассмотрим функцию сервисного слоя, использующуюся для получения планеты по id:


Получение планеты по id


pub async fn get_planet(&self, planet_id: &str) -> Result<Planet, CustomError> {
let cache_key = self.get_planet_cache_key(planet_id);
let mut con = self.redis_client.get_async_connection().await?;
let cache_is_empty: bool = !con.exists(&cache_key).await?;

let planet = if cache_is_empty {
debug!("Use database to retrieve a planet by id: {}", &planet_id);
let result: Planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let _: () = con.set(&cache_key, &result).await?;
result
} else {
debug!("Use cache to retrieve a planet by id: {}", planet_id);
let cached_value = con.get(&cache_key).await?;
let planet_string: String = FromRedisValue::from_redis_value(&cached_value)?;
serde_json::from_str(&planet_string)?
};

Ok(planet)
}

В первой ветви if-else вы видите пример как поместить пару ключ-значение в Redis используя функцию set; во второй ветви показано как получить значение из кэша по ключу. Для помещения значения в кэш вам нужно имплементировать трейт ToRedisArgs для структуры:


Имплементация трейта ToRedisArgs


impl ToRedisArgs for &Planet {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg_fmt(serde_json::to_string(self).expect("Can't serialize Planet as string"))
}
}

В функции get_planet используется асинхронное соединение Redis. Следующий пример демонстрирует другой подход, ConnectionManager, на примере очистки кэша с использованием функции del:


Пример очистки кэша


pub async fn update_planet(
&self,
planet_id: &str,
planet: Planet,
) -> Result<Planet, CustomError> {
let updated_planet = self
.mongodb_client
.update_planet(ObjectId::from_str(planet_id)?, planet)
.await?;

let cache_key = self.get_planet_cache_key(planet_id);
self.redis_connection_manager.clone().del(cache_key).await?;

Ok(updated_planet)
}

ConnectionManager может быть клонирован. Он также используется во всех оставшихся примерах использования Redis вместо Redis клиента.


Кэш изображений может быть имплементирован так же, как и кэши других типов данных (используя функции set/get):


Кэширование изображений


pub async fn get_image_of_planet(&self, planet_id: &str) -> Result<Vec<u8>, CustomError> {
let cache_key = self.get_image_cache_key(planet_id);
let mut redis_connection_manager = self.redis_connection_manager.clone();
let cache_is_empty: bool = !redis_connection_manager.exists(&cache_key).await?;

let image: Vec<u8> = if cache_is_empty {
debug!(
"Use database to retrieve an image of a planet by id: {}",
&planet_id
);
let planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let result = crate::db::get_image_of_planet(&planet.name).await;
let _: () = redis_connection_manager
.set(&cache_key, result.clone())
.await?;
result
} else {
debug!(
"Use cache to retrieve an image of a planet by id: {}",
&planet_id
);
redis_connection_manager.get(&cache_key).await?
};

Ok(image)
}

Кэширование может быть протестировано с использованием REST API, описанного выше.


Ограничение количества HTTP запросов​


Эта фича реализована в соответствии с официальным гайдом следующим образом:


Имплементация rate limiter


#[derive(Clone)]
pub struct RateLimitingService {
redis_connection_manager: ConnectionManager,
}

impl RateLimitingService {
pub fn new(redis_connection_manager: ConnectionManager) -> Self {
RateLimitingService {
redis_connection_manager,
}
}

pub async fn assert_rate_limit_not_exceeded(&self, ip_addr: String) -> Result<(), CustomError> {
let current_minute = Utc::now().minute();
let rate_limit_key = format!("{}:{}:{}", RATE_LIMIT_KEY_PREFIX, ip_addr, current_minute);

let (count,): (u64,) = redis::pipe()
.atomic()
.incr(&rate_limit_key, 1)
.expire(&rate_limit_key, 60)
.ignore()
.query_async(&mut self.redis_connection_manager.clone())
.await?;

if count > MAX_REQUESTS_PER_MINUTE {
Err(TooManyRequests {
actual_count: count,
permitted_count: MAX_REQUESTS_PER_MINUTE,
})
} else {
Ok(())
}
}
}

Redis ключ создаётся на каждую минуту + IP адрес клиента. После каждого вызова функции assert_rate_limit_not_exceeded значение, соответствующее ключу, инкрементируется на 1. Чтобы хранилище не переполнилось из-за большого количества ранее созданных пар ключ-значение, ключ "экспайрится" через минуту.


Rate limiter может быть использован в Actix обработчике следующим образом:


Использование rate limiter


pub async fn get_planets(
req: HttpRequest,
web::Query(query_params): web::Query<GetPlanetsQueryParams>,
rate_limit_service: web::Data<Arc<RateLimitingService>>,
planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
rate_limit_service
.assert_rate_limit_not_exceeded(get_ip_addr(&req)?)
.await?;

let planets = planet_service.get_planets(query_params.r#type).await?;
Ok(HttpResponse::Ok().json(planets.into_iter().map(PlanetDto::from).collect::<Vec<_>>()))
}

Если вызывать метод получения списка планет слишком часто, то будет получена следующая ошибка:


rate limiting



Нотификации​


В этом проекте нотификации реализованы с помощью Redis Pub/Sub и Server-Sent Events для доставки сообщений пользователю.


При создании сущности публикуется событие:


Публикация события в Redis


pub async fn create_planet(&self, planet: Planet) -> Result<Planet, CustomError> {
let planet = self.mongodb_client.create_planet(planet).await?;
self.redis_connection_manager
.clone()
.publish(
NEW_PLANETS_CHANNEL_NAME,
serde_json::to_string(&PlanetMessage::from(&planet))?,
)
.await?;
Ok(planet)
}

Подписка реализуется так:


Пример подписки в Redis


pub async fn get_new_planets_stream(
&self,
) -> Result<Receiver<Result<Bytes, CustomError>>, CustomError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, CustomError>>(100);

tx.send(Ok(Bytes::from("data: Connected\n\n")))
.await
.expect("Can't send a message to the stream");

let mut pubsub_con = self
.redis_client
.get_async_connection()
.await?
.into_pubsub();
pubsub_con.subscribe(NEW_PLANETS_CHANNEL_NAME).await?;

tokio::spawn(async move {
while let Some(msg) = pubsub_con.on_message().next().await {
let payload = msg.get_payload().expect("Can't get payload of message");
let payload: String = FromRedisValue::from_redis_value(&payload)
.expect("Can't convert from Redis value");
let msg = Bytes::from(format!("data: Planet created: {:?}\n\n", payload));
tx.send(Ok(msg))
.await
.expect("Can't send a message to the stream");
}
});

Ok(rx)
}

Подписка используется в Actix обработчике так:


Пример SSE handler


pub async fn sse(
planet_service: web::Data<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
let new_planets_stream = planet_service.get_new_planets_stream().await?;
let response_stream = tokio_stream::wrappers::ReceiverStream::new(new_planets_stream);

Ok(HttpResponse::build(StatusCode::OK)
.insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
.streaming(response_stream))
}

Чтобы протестировать нотификации, вам нужно подписаться на события и сгенерировать событие. Далее приведены два подхода для этого; в обоих событие генерируется с использованием cURL:


  • подписка из браузера
    Перейдите на http://localhost:9000/, где находится HTML страница:
    sse notifications browser
  • подписка из командной строки с использованием cURL
    Используйте curl -X GET localhost:9000/events:
    sse notifications curl

Для генерации события используется следующий cURL запрос:


Запрос для тестирования нотификаций


curl -X POST -H 'Content-Type: application/json' -d '{
\"name\": \"Pluto\",
\"type\": \"DwarfPlanet\",
\"mean_radius\": 1188,
\"satellites\": null
}' localhost:9000/planets

Веб приложение​


Некоторые аспекты этой темы были включены в предыдущие разделы, поэтому здесь будут освещены некоторые из оставшихся тем.


REST API обработчики​


REST API обработчики определены так:


Определение REST API обработчиков


#[actix_web::main]
async fn main() -> std:🇮🇴:Result<()> {
...

let enable_write_handlers = env::var("ENABLE_WRITE_HANDLERS")
.expect("ENABLE_WRITE_HANDLERS env var should be specified")
.parse::<bool>()
.expect("Can't parse ENABLE_WRITE_HANDLERS");

HttpServer::new(move || {
let mut app = App::new()
.route("/planets", web::get().to(handlers::get_planets))
.route("/planets/{planet_id}", web::get().to(handlers::get_planet))
.route(
"/planets/{planet_id}/image",
web::get().to(handlers::get_image_of_planet),
)
.route("/events", web::get().to(handlers::sse))
.route("/", web::get().to(handlers::index))
.data(Arc::clone(&planet_service))
.data(Arc::clone(&rate_limiting_service));

if enable_write_handlers {
app = app
.route("/planets", web::post().to(handlers::create_planet))
.route(
"/planets/{planet_id}",
web::put().to(handlers::update_planet),
)
.route(
"/planets/{planet_id}",
web::delete().to(handlers::delete_planet),
);
}

app
})
.bind("0.0.0.0:9000")?
.run()
.await
}

Обработка ошибок​


Обработка ошибок имплементирована в соответствии с документацией.


Запуск и тестирование​


Локально проект может быть запущен двумя способами:


  • с использованием Docker Compose (docker-compose.yml):
    docker compose up (или docker-compose up в более старых версиях Docker)
  • без использования Docker
    Запустите приложение с помощью cargo run (в этом случае сервис mongodb-redis в docker-compose.yml должен быть отключён)

CI/CD​


CI/CD сконфигурировано с помощью GitHub Actions workflow, который собирает Docker образ приложения и разворачивает его на Google Cloud Platform.


Для доступа к REST API развёрнутого приложения вы можете использовать один из доступных GET эндпоинтов, например:


GET http://demo.romankudryashov.com:9000/planets


Пишущие методы REST API недоступны на production среде.


Заключение​


В этой статье я показал как начать работу с MongoDB и Redis и примеры их использования в Rust приложении. Не стесняйтесь написать мне, если нашли какие-либо ошибки в статье или исходном коде. Спасибо за внимание!

Источник статьи: https://habr.com/ru/post/568856/
 
Сверху