В этой статье будет показано как создать Rust бэкэнд, который использует MongoDB, документо-ориентированную БД, для хранения данных и Redis для кэширования, ограничения количества HTTP запросов и нотификаций пользователя. Для большей наглядности созданное приложение также будет предоставлять REST API. В итоге будет получена следующая архитектура:
MongoDB является хранилищем, в то время как Redis используется для следующего:
Проект реализован с помощью MongoDB Rust driver и крейта redis-rs.
Вы сможете протестировать REST API приложения, поскольку оно развёрнуто на Google Cloud Platform.
Доменная модель включает данные о планетах Солнечной системы и их спутниках.
Этот раздел не требует навыков программирования на Rust и может быть использован вне зависимости от языка программирования приложения.
Обе тулы могут быть запущены как Docker контейнер:
docker-compose.yml
version: '3.8'
services:
...
mongodb:
image: mongo
container_name: mongodb
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: $MONGODB_USERNAME
MONGO_INITDB_ROOT_PASSWORD: $MONGODB_PASSWORD
MONGO_INITDB_DATABASE: solar_system_info
ports:
- 27017:27017
mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
redis:
image: redis:alpine
container_name: redis
ports:
- 6379:6379
Назначение контейнера mongodb_seed будет показано далее.
Вы можете получить доступ к mongo shell с помощью следующей команды:
docker exec -it mongodb mongo --username admin --password password
(где mongodb — название Docker контейнера, mongo — shell)
Теперь вы можете выполнять команды, например:
Доступ к Redis CLI может быть получен с помощью следующей команды:
docker exec -it redis redis-cli
Простейший пример команды выглядит так:
Пример команды Redis
> set mykey somevalue
OK
> get mykey
"somevalue"
Для получения списка ключей используйте команду keys *.
Вы можете найти больше примеров команд для Redis CLI в этом гайде.
MongoDB инициализируется данными в формате JSON с использованием контейнера mongodb_seed и команды mongoimport:
docker-compose.yml
mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
Также инициализация БД может быть выполнена с использованием JavaScript файла.
Приложение также работает с изображениями планет. Первоначально я собирался хранить их в MongoDB; это может быть сделано с помощью следующего скрипта:
mongofiles --host mongodb --db solar_system_info --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD put /mongodb-init/images/*.jpg
Однако вскоре обнаружилось, что изображения не могут быть получены из БД из-за отсутствия поддержки GridFS в MongoDB Rust Driver (открытая задача). Поэтому для простоты используется крейт rust_embed, который позволяет включить изображения в бинарный исполняемый файл приложения во время компиляции (при разработке изображения загружаются из файловой системы). (Изображения также возможно хранить отдельно от приложения; папка images должна быть смонтирована как volume в определении Docker Compose сервиса)
Далее будет показано как использовать MongoDB и Redis в Rust приложении.
Приложение имплементировано с помощью:
Cargo.toml
[package]
name = "mongodb-redis"
version = "0.1.0"
edition = "2018"
[dependencies]
mongodb = "2.0.0-beta.1"
redis = { version = "0.20.2", features = ["tokio-comp", "connection-manager"] }
actix-web = "4.0.0-beta.7"
tokio = "1.7.1"
tokio-stream = "0.1.6"
chrono = { version = "0.4.19", features = ["serde"] }
serde = "1.0.126"
serde_json = "1.0.64"
dotenv = "0.15.0"
derive_more = "0.99.14"
log = "0.4.14"
env_logger = "0.8.4"
rust-embed = "5.9.0"
mime = "0.3.16"
Структура проекта
├───images
│
├───mongodb-init
│ init.json
│
└───src
db.rs
dto.rs
errors.rs
handlers.rs
index.html
main.rs
model.rs
redis.rs
services.rs
Функция main
#[actix_web::main]
async fn main() -> std:
:Result<()> {
dotenv::from_filename(".env.local").ok();
env_logger::init();
info!("Starting MongoDB & Redis demo server");
let mongodb_uri = env::var("MONGODB_URI").expect("MONGODB_URI env var should be specified");
let mongodb_client = MongoDbClient::new(mongodb_uri).await;
let redis_uri = env::var("REDIS_URI").expect("REDIS_URI env var should be specified");
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
let planet_service = Arc::new(PlanetService::new(
mongodb_client,
redis_client,
redis_connection_manager.clone(),
));
let rate_limiting_service = Arc::new(RateLimitingService::new(redis_connection_manager));
...
}
Здесь определён кастомный MongoDbClient, клиент Redis и менеджер соединений Redis.
Начнём с функции, возвращающей список планет, хранящихся в БД, и использующей асинхронный API:
Функция, возвращающая список планет
const DB_NAME: &str = "solar_system_info";
const COLLECTION_NAME: &str = "planets";
pub async fn get_planets(
&self,
planet_type: Option<PlanetType>,
) -> Result<Vec<Planet>, CustomError> {
let filter = planet_type.map(|pt| {
doc! { "type": pt.to_string() }
});
let mut planets = self.get_planets_collection().find(filter, None).await?;
let mut result: Vec<Planet> = Vec::new();
while let Some(planet) = planets.next().await {
result.push(planet?);
}
Ok(result)
}
fn get_planets_collection(&self) -> Collection<Planet> {
self.client
.database(DB_NAME)
.collection::<Planet>(COLLECTION_NAME)
}
get_planets также включает пример фильтрации документов MongoDB по определённому критерию.
Модель данных выглядит так:
Модель данных
#[derive(Serialize, Deserialize, Debug)]
pub struct Planet {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub name: String,
pub r#type: PlanetType,
pub mean_radius: f32,
pub satellites: Option<Vec<Satellite>>,
}
#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub enum PlanetType {
TerrestrialPlanet,
GasGiant,
IceGiant,
DwarfPlanet,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Satellite {
pub name: String,
pub first_spacecraft_landing_date: Option<mongodb::bson:
ateTime>,
}
Структуры содержат поля "обычных" типов (string, f32), а также:
Проект также включает примеры получения, создания, обновления и удаления MongoDB документов. Я не буду подробно останавливаться на этих функциях ввиду очевидности кода их имплементации. Вы можете протестировать эти функции используя REST API:
MongoDB документы хранятся в формате BSON.
Redis клиент создаётся следующим образом:
Создание Redis клиента
pub async fn create_client(redis_uri: String) -> Result<Client, RedisError> {
Ok(Client:
pen(redis_uri)?)
}
Менеджер соединений Redis может быть создан так:
Получение менеджера соединений Redis
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
Рассмотрим функцию сервисного слоя, использующуюся для получения планеты по id:
Получение планеты по id
pub async fn get_planet(&self, planet_id: &str) -> Result<Planet, CustomError> {
let cache_key = self.get_planet_cache_key(planet_id);
let mut con = self.redis_client.get_async_connection().await?;
let cache_is_empty: bool = !con.exists(&cache_key).await?;
let planet = if cache_is_empty {
debug!("Use database to retrieve a planet by id: {}", &planet_id);
let result: Planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let _: () = con.set(&cache_key, &result).await?;
result
} else {
debug!("Use cache to retrieve a planet by id: {}", planet_id);
let cached_value = con.get(&cache_key).await?;
let planet_string: String = FromRedisValue::from_redis_value(&cached_value)?;
serde_json::from_str(&planet_string)?
};
Ok(planet)
}
В первой ветви if-else вы видите пример как поместить пару ключ-значение в Redis используя функцию set; во второй ветви показано как получить значение из кэша по ключу. Для помещения значения в кэш вам нужно имплементировать трейт ToRedisArgs для структуры:
Имплементация трейта ToRedisArgs
impl ToRedisArgs for &Planet {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg_fmt(serde_json::to_string(self).expect("Can't serialize Planet as string"))
}
}
В функции get_planet используется асинхронное соединение Redis. Следующий пример демонстрирует другой подход, ConnectionManager, на примере очистки кэша с использованием функции del:
Пример очистки кэша
pub async fn update_planet(
&self,
planet_id: &str,
planet: Planet,
) -> Result<Planet, CustomError> {
let updated_planet = self
.mongodb_client
.update_planet(ObjectId::from_str(planet_id)?, planet)
.await?;
let cache_key = self.get_planet_cache_key(planet_id);
self.redis_connection_manager.clone().del(cache_key).await?;
Ok(updated_planet)
}
ConnectionManager может быть клонирован. Он также используется во всех оставшихся примерах использования Redis вместо Redis клиента.
Кэш изображений может быть имплементирован так же, как и кэши других типов данных (используя функции set/get):
Кэширование изображений
pub async fn get_image_of_planet(&self, planet_id: &str) -> Result<Vec<u8>, CustomError> {
let cache_key = self.get_image_cache_key(planet_id);
let mut redis_connection_manager = self.redis_connection_manager.clone();
let cache_is_empty: bool = !redis_connection_manager.exists(&cache_key).await?;
let image: Vec<u8> = if cache_is_empty {
debug!(
"Use database to retrieve an image of a planet by id: {}",
&planet_id
);
let planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let result = crate::db::get_image_of_planet(&planet.name).await;
let _: () = redis_connection_manager
.set(&cache_key, result.clone())
.await?;
result
} else {
debug!(
"Use cache to retrieve an image of a planet by id: {}",
&planet_id
);
redis_connection_manager.get(&cache_key).await?
};
Ok(image)
}
Кэширование может быть протестировано с использованием REST API, описанного выше.
Эта фича реализована в соответствии с официальным гайдом следующим образом:
Имплементация rate limiter
#[derive(Clone)]
pub struct RateLimitingService {
redis_connection_manager: ConnectionManager,
}
impl RateLimitingService {
pub fn new(redis_connection_manager: ConnectionManager) -> Self {
RateLimitingService {
redis_connection_manager,
}
}
pub async fn assert_rate_limit_not_exceeded(&self, ip_addr: String) -> Result<(), CustomError> {
let current_minute = Utc::now().minute();
let rate_limit_key = format!("{}:{}:{}", RATE_LIMIT_KEY_PREFIX, ip_addr, current_minute);
let (count,): (u64,) = redis:
ipe()
.atomic()
.incr(&rate_limit_key, 1)
.expire(&rate_limit_key, 60)
.ignore()
.query_async(&mut self.redis_connection_manager.clone())
.await?;
if count > MAX_REQUESTS_PER_MINUTE {
Err(TooManyRequests {
actual_count: count,
permitted_count: MAX_REQUESTS_PER_MINUTE,
})
} else {
Ok(())
}
}
}
Redis ключ создаётся на каждую минуту + IP адрес клиента. После каждого вызова функции assert_rate_limit_not_exceeded значение, соответствующее ключу, инкрементируется на 1. Чтобы хранилище не переполнилось из-за большого количества ранее созданных пар ключ-значение, ключ "экспайрится" через минуту.
Rate limiter может быть использован в Actix обработчике следующим образом:
Использование rate limiter
pub async fn get_planets(
req: HttpRequest,
web::Query(query_params): web::Query<GetPlanetsQueryParams>,
rate_limit_service: web:
ata<Arc<RateLimitingService>>,
planet_service: web:
ata<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
rate_limit_service
.assert_rate_limit_not_exceeded(get_ip_addr(&req)?)
.await?;
let planets = planet_service.get_planets(query_params.r#type).await?;
Ok(HttpResponse::Ok().json(planets.into_iter().map(PlanetDto::from).collect::<Vec<_>>()))
}
Если вызывать метод получения списка планет слишком часто, то будет получена следующая ошибка:
В этом проекте нотификации реализованы с помощью Redis Pub/Sub и Server-Sent Events для доставки сообщений пользователю.
При создании сущности публикуется событие:
Публикация события в Redis
pub async fn create_planet(&self, planet: Planet) -> Result<Planet, CustomError> {
let planet = self.mongodb_client.create_planet(planet).await?;
self.redis_connection_manager
.clone()
.publish(
NEW_PLANETS_CHANNEL_NAME,
serde_json::to_string(&PlanetMessage::from(&planet))?,
)
.await?;
Ok(planet)
}
Подписка реализуется так:
Пример подписки в Redis
pub async fn get_new_planets_stream(
&self,
) -> Result<Receiver<Result<Bytes, CustomError>>, CustomError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, CustomError>>(100);
tx.send(Ok(Bytes::from("data: Connected\n\n")))
.await
.expect("Can't send a message to the stream");
let mut pubsub_con = self
.redis_client
.get_async_connection()
.await?
.into_pubsub();
pubsub_con.subscribe(NEW_PLANETS_CHANNEL_NAME).await?;
tokio::spawn(async move {
while let Some(msg) = pubsub_con.on_message().next().await {
let payload = msg.get_payload().expect("Can't get payload of message");
let payload: String = FromRedisValue::from_redis_value(&payload)
.expect("Can't convert from Redis value");
let msg = Bytes::from(format!("data: Planet created: {:?}\n\n", payload));
tx.send(Ok(msg))
.await
.expect("Can't send a message to the stream");
}
});
Ok(rx)
}
Подписка используется в Actix обработчике так:
Пример SSE handler
pub async fn sse(
planet_service: web:
ata<Arc<PlanetService>>,
) -> Result<HttpResponse, CustomError> {
let new_planets_stream = planet_service.get_new_planets_stream().await?;
let response_stream = tokio_stream::wrappers::ReceiverStream::new(new_planets_stream);
Ok(HttpResponse::build(StatusCode::OK)
.insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
.streaming(response_stream))
}
Чтобы протестировать нотификации, вам нужно подписаться на события и сгенерировать событие. Далее приведены два подхода для этого; в обоих событие генерируется с использованием cURL:
Для генерации события используется следующий cURL запрос:
Запрос для тестирования нотификаций
curl -X POST -H 'Content-Type: application/json' -d '{
\"name\": \"Pluto\",
\"type\": \"DwarfPlanet\",
\"mean_radius\": 1188,
\"satellites\": null
}' localhost:9000/planets
Некоторые аспекты этой темы были включены в предыдущие разделы, поэтому здесь будут освещены некоторые из оставшихся тем.
REST API обработчики определены так:
Определение REST API обработчиков
#[actix_web::main]
async fn main() -> std:
:Result<()> {
...
let enable_write_handlers = env::var("ENABLE_WRITE_HANDLERS")
.expect("ENABLE_WRITE_HANDLERS env var should be specified")
.parse::<bool>()
.expect("Can't parse ENABLE_WRITE_HANDLERS");
HttpServer::new(move || {
let mut app = App::new()
.route("/planets", web::get().to(handlers::get_planets))
.route("/planets/{planet_id}", web::get().to(handlers::get_planet))
.route(
"/planets/{planet_id}/image",
web::get().to(handlers::get_image_of_planet),
)
.route("/events", web::get().to(handlers::sse))
.route("/", web::get().to(handlers::index))
.data(Arc::clone(&planet_service))
.data(Arc::clone(&rate_limiting_service));
if enable_write_handlers {
app = app
.route("/planets", web:
ost().to(handlers::create_planet))
.route(
"/planets/{planet_id}",
web:
ut().to(handlers::update_planet),
)
.route(
"/planets/{planet_id}",
web::delete().to(handlers::delete_planet),
);
}
app
})
.bind("0.0.0.0:9000")?
.run()
.await
}
Обработка ошибок имплементирована в соответствии с документацией.
Локально проект может быть запущен двумя способами:
CI/CD сконфигурировано с помощью GitHub Actions workflow, который собирает Docker образ приложения и разворачивает его на Google Cloud Platform.
Для доступа к REST API развёрнутого приложения вы можете использовать один из доступных GET эндпоинтов, например:
GET http://demo.romankudryashov.com:9000/planets
Пишущие методы REST API недоступны на production среде.
В этой статье я показал как начать работу с MongoDB и Redis и примеры их использования в Rust приложении. Не стесняйтесь написать мне, если нашли какие-либо ошибки в статье или исходном коде. Спасибо за внимание!
Источник статьи: https://habr.com/ru/post/568856/

MongoDB является хранилищем, в то время как Redis используется для следующего:
- кэш (включая изображения)
- ограничение количества HTTP запросов
- нотификации с использованием паттерна publish-subscribe
Проект реализован с помощью MongoDB Rust driver и крейта redis-rs.
Вы сможете протестировать REST API приложения, поскольку оно развёрнуто на Google Cloud Platform.
Доменная модель включает данные о планетах Солнечной системы и их спутниках.
Запуск MongoDB и Redis
Этот раздел не требует навыков программирования на Rust и может быть использован вне зависимости от языка программирования приложения.
Обе тулы могут быть запущены как Docker контейнер:
docker-compose.yml
version: '3.8'
services:
...
mongodb:
image: mongo
container_name: mongodb
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: $MONGODB_USERNAME
MONGO_INITDB_ROOT_PASSWORD: $MONGODB_PASSWORD
MONGO_INITDB_DATABASE: solar_system_info
ports:
- 27017:27017
mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
redis:
image: redis:alpine
container_name: redis
ports:
- 6379:6379
Назначение контейнера mongodb_seed будет показано далее.
Вы можете получить доступ к mongo shell с помощью следующей команды:
docker exec -it mongodb mongo --username admin --password password
(где mongodb — название Docker контейнера, mongo — shell)
Теперь вы можете выполнять команды, например:
- получить список баз данных с помощью show dbs
- получить все данные в определённой базе данных:
- use solar_system_info
- show collections
- db.planets.find()
Доступ к Redis CLI может быть получен с помощью следующей команды:
docker exec -it redis redis-cli
Простейший пример команды выглядит так:
Пример команды Redis
> set mykey somevalue
OK
> get mykey
"somevalue"
Для получения списка ключей используйте команду keys *.
Вы можете найти больше примеров команд для Redis CLI в этом гайде.
Инициализация данных
MongoDB инициализируется данными в формате JSON с использованием контейнера mongodb_seed и команды mongoimport:
docker-compose.yml
mongodb-seed:
image: mongo
container_name: mongodb-seed
depends_on:
- mongodb
volumes:
- ./mongodb-init:/mongodb-init
links:
- mongodb
command:
mongoimport --host mongodb --db solar_system_info --collection planets --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD --drop --jsonArray --file /mongodb-init/init.json
Также инициализация БД может быть выполнена с использованием JavaScript файла.
Приложение также работает с изображениями планет. Первоначально я собирался хранить их в MongoDB; это может быть сделано с помощью следующего скрипта:
mongofiles --host mongodb --db solar_system_info --authenticationDatabase admin --username $MONGODB_USERNAME --password $MONGODB_PASSWORD put /mongodb-init/images/*.jpg
Однако вскоре обнаружилось, что изображения не могут быть получены из БД из-за отсутствия поддержки GridFS в MongoDB Rust Driver (открытая задача). Поэтому для простоты используется крейт rust_embed, который позволяет включить изображения в бинарный исполняемый файл приложения во время компиляции (при разработке изображения загружаются из файловой системы). (Изображения также возможно хранить отдельно от приложения; папка images должна быть смонтирована как volume в определении Docker Compose сервиса)
Далее будет показано как использовать MongoDB и Redis в Rust приложении.
Имплементация приложения
Зависимости
Приложение имплементировано с помощью:
Cargo.toml
[package]
name = "mongodb-redis"
version = "0.1.0"
edition = "2018"
[dependencies]
mongodb = "2.0.0-beta.1"
redis = { version = "0.20.2", features = ["tokio-comp", "connection-manager"] }
actix-web = "4.0.0-beta.7"
tokio = "1.7.1"
tokio-stream = "0.1.6"
chrono = { version = "0.4.19", features = ["serde"] }
serde = "1.0.126"
serde_json = "1.0.64"
dotenv = "0.15.0"
derive_more = "0.99.14"
log = "0.4.14"
env_logger = "0.8.4"
rust-embed = "5.9.0"
mime = "0.3.16"
Структура проекта
Структура проекта
├───images
│
├───mongodb-init
│ init.json
│
└───src
db.rs
dto.rs
errors.rs
handlers.rs
index.html
main.rs
model.rs
redis.rs
services.rs
Функция main
Функция main
#[actix_web::main]
async fn main() -> std:

dotenv::from_filename(".env.local").ok();
env_logger::init();
info!("Starting MongoDB & Redis demo server");
let mongodb_uri = env::var("MONGODB_URI").expect("MONGODB_URI env var should be specified");
let mongodb_client = MongoDbClient::new(mongodb_uri).await;
let redis_uri = env::var("REDIS_URI").expect("REDIS_URI env var should be specified");
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
let planet_service = Arc::new(PlanetService::new(
mongodb_client,
redis_client,
redis_connection_manager.clone(),
));
let rate_limiting_service = Arc::new(RateLimitingService::new(redis_connection_manager));
...
}
Здесь определён кастомный MongoDbClient, клиент Redis и менеджер соединений Redis.
Работа с MongoDB
Начнём с функции, возвращающей список планет, хранящихся в БД, и использующей асинхронный API:
Функция, возвращающая список планет
const DB_NAME: &str = "solar_system_info";
const COLLECTION_NAME: &str = "planets";
pub async fn get_planets(
&self,
planet_type: Option<PlanetType>,
) -> Result<Vec<Planet>, CustomError> {
let filter = planet_type.map(|pt| {
doc! { "type": pt.to_string() }
});
let mut planets = self.get_planets_collection().find(filter, None).await?;
let mut result: Vec<Planet> = Vec::new();
while let Some(planet) = planets.next().await {
result.push(planet?);
}
Ok(result)
}
fn get_planets_collection(&self) -> Collection<Planet> {
self.client
.database(DB_NAME)
.collection::<Planet>(COLLECTION_NAME)
}
get_planets также включает пример фильтрации документов MongoDB по определённому критерию.
Модель данных выглядит так:
Модель данных
#[derive(Serialize, Deserialize, Debug)]
pub struct Planet {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
pub name: String,
pub r#type: PlanetType,
pub mean_radius: f32,
pub satellites: Option<Vec<Satellite>>,
}
#[derive(Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub enum PlanetType {
TerrestrialPlanet,
GasGiant,
IceGiant,
DwarfPlanet,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Satellite {
pub name: String,
pub first_spacecraft_landing_date: Option<mongodb::bson:
}
Структуры содержат поля "обычных" типов (string, f32), а также:
- ObjectId (Planet.id)
- список (Planet.satellites)
- дата/timestamp (Satellite.first_spacecraft_landing_date)
- перечисление (Planet.type)
- nullable поля (Planet.id, Planet.satellites)
Проект также включает примеры получения, создания, обновления и удаления MongoDB документов. Я не буду подробно останавливаться на этих функциях ввиду очевидности кода их имплементации. Вы можете протестировать эти функции используя REST API:
- получение всего списка
GET http://localhost:9000/planets
Пример с фильтрацией:
GET http://localhost:9000/planets?type=IceGiant - создание
POST http://localhost:9000/planets
Body:
{
"name": "Pluto",
"type": "DwarfPlanet",
"mean_radius": 1188,
"satellites": null
}
- получение по id
GET http://localhost:9000/{planet_id} - обновление
PUT http://localhost:9000/{planet_id}
Body:
{
"name": "Mercury",
"type": "TerrestrialPlanet",
"mean_radius": 2439.7,
"satellites": null
}
- удаление
DELETE http://localhost:9000/{planet_id} - получение изображения планеты
GET http://localhost:9000/planets/{planet_id}/image
Используйте этот метод для тестирования кэширования с помощью Redis
MongoDB документы хранятся в формате BSON.
Работа с Redis
Redis клиент создаётся следующим образом:
Создание Redis клиента
pub async fn create_client(redis_uri: String) -> Result<Client, RedisError> {
Ok(Client:
}
Менеджер соединений Redis может быть создан так:
Получение менеджера соединений Redis
let redis_client = redis::create_client(redis_uri)
.await
.expect("Can't create Redis client");
let redis_connection_manager = redis_client
.get_tokio_connection_manager()
.await
.expect("Can't create Redis connection manager");
Кэширование
Рассмотрим функцию сервисного слоя, использующуюся для получения планеты по id:
Получение планеты по id
pub async fn get_planet(&self, planet_id: &str) -> Result<Planet, CustomError> {
let cache_key = self.get_planet_cache_key(planet_id);
let mut con = self.redis_client.get_async_connection().await?;
let cache_is_empty: bool = !con.exists(&cache_key).await?;
let planet = if cache_is_empty {
debug!("Use database to retrieve a planet by id: {}", &planet_id);
let result: Planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let _: () = con.set(&cache_key, &result).await?;
result
} else {
debug!("Use cache to retrieve a planet by id: {}", planet_id);
let cached_value = con.get(&cache_key).await?;
let planet_string: String = FromRedisValue::from_redis_value(&cached_value)?;
serde_json::from_str(&planet_string)?
};
Ok(planet)
}
В первой ветви if-else вы видите пример как поместить пару ключ-значение в Redis используя функцию set; во второй ветви показано как получить значение из кэша по ключу. Для помещения значения в кэш вам нужно имплементировать трейт ToRedisArgs для структуры:
Имплементация трейта ToRedisArgs
impl ToRedisArgs for &Planet {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg_fmt(serde_json::to_string(self).expect("Can't serialize Planet as string"))
}
}
В функции get_planet используется асинхронное соединение Redis. Следующий пример демонстрирует другой подход, ConnectionManager, на примере очистки кэша с использованием функции del:
Пример очистки кэша
pub async fn update_planet(
&self,
planet_id: &str,
planet: Planet,
) -> Result<Planet, CustomError> {
let updated_planet = self
.mongodb_client
.update_planet(ObjectId::from_str(planet_id)?, planet)
.await?;
let cache_key = self.get_planet_cache_key(planet_id);
self.redis_connection_manager.clone().del(cache_key).await?;
Ok(updated_planet)
}
ConnectionManager может быть клонирован. Он также используется во всех оставшихся примерах использования Redis вместо Redis клиента.
Кэш изображений может быть имплементирован так же, как и кэши других типов данных (используя функции set/get):
Кэширование изображений
pub async fn get_image_of_planet(&self, planet_id: &str) -> Result<Vec<u8>, CustomError> {
let cache_key = self.get_image_cache_key(planet_id);
let mut redis_connection_manager = self.redis_connection_manager.clone();
let cache_is_empty: bool = !redis_connection_manager.exists(&cache_key).await?;
let image: Vec<u8> = if cache_is_empty {
debug!(
"Use database to retrieve an image of a planet by id: {}",
&planet_id
);
let planet = self
.mongodb_client
.get_planet(ObjectId::from_str(planet_id)?)
.await?;
let result = crate::db::get_image_of_planet(&planet.name).await;
let _: () = redis_connection_manager
.set(&cache_key, result.clone())
.await?;
result
} else {
debug!(
"Use cache to retrieve an image of a planet by id: {}",
&planet_id
);
redis_connection_manager.get(&cache_key).await?
};
Ok(image)
}
Кэширование может быть протестировано с использованием REST API, описанного выше.
Ограничение количества HTTP запросов
Эта фича реализована в соответствии с официальным гайдом следующим образом:
Имплементация rate limiter
#[derive(Clone)]
pub struct RateLimitingService {
redis_connection_manager: ConnectionManager,
}
impl RateLimitingService {
pub fn new(redis_connection_manager: ConnectionManager) -> Self {
RateLimitingService {
redis_connection_manager,
}
}
pub async fn assert_rate_limit_not_exceeded(&self, ip_addr: String) -> Result<(), CustomError> {
let current_minute = Utc::now().minute();
let rate_limit_key = format!("{}:{}:{}", RATE_LIMIT_KEY_PREFIX, ip_addr, current_minute);
let (count,): (u64,) = redis:
.atomic()
.incr(&rate_limit_key, 1)
.expire(&rate_limit_key, 60)
.ignore()
.query_async(&mut self.redis_connection_manager.clone())
.await?;
if count > MAX_REQUESTS_PER_MINUTE {
Err(TooManyRequests {
actual_count: count,
permitted_count: MAX_REQUESTS_PER_MINUTE,
})
} else {
Ok(())
}
}
}
Redis ключ создаётся на каждую минуту + IP адрес клиента. После каждого вызова функции assert_rate_limit_not_exceeded значение, соответствующее ключу, инкрементируется на 1. Чтобы хранилище не переполнилось из-за большого количества ранее созданных пар ключ-значение, ключ "экспайрится" через минуту.
Rate limiter может быть использован в Actix обработчике следующим образом:
Использование rate limiter
pub async fn get_planets(
req: HttpRequest,
web::Query(query_params): web::Query<GetPlanetsQueryParams>,
rate_limit_service: web:
planet_service: web:
) -> Result<HttpResponse, CustomError> {
rate_limit_service
.assert_rate_limit_not_exceeded(get_ip_addr(&req)?)
.await?;
let planets = planet_service.get_planets(query_params.r#type).await?;
Ok(HttpResponse::Ok().json(planets.into_iter().map(PlanetDto::from).collect::<Vec<_>>()))
}
Если вызывать метод получения списка планет слишком часто, то будет получена следующая ошибка:

Нотификации
В этом проекте нотификации реализованы с помощью Redis Pub/Sub и Server-Sent Events для доставки сообщений пользователю.
При создании сущности публикуется событие:
Публикация события в Redis
pub async fn create_planet(&self, planet: Planet) -> Result<Planet, CustomError> {
let planet = self.mongodb_client.create_planet(planet).await?;
self.redis_connection_manager
.clone()
.publish(
NEW_PLANETS_CHANNEL_NAME,
serde_json::to_string(&PlanetMessage::from(&planet))?,
)
.await?;
Ok(planet)
}
Подписка реализуется так:
Пример подписки в Redis
pub async fn get_new_planets_stream(
&self,
) -> Result<Receiver<Result<Bytes, CustomError>>, CustomError> {
let (tx, rx) = mpsc::channel::<Result<Bytes, CustomError>>(100);
tx.send(Ok(Bytes::from("data: Connected\n\n")))
.await
.expect("Can't send a message to the stream");
let mut pubsub_con = self
.redis_client
.get_async_connection()
.await?
.into_pubsub();
pubsub_con.subscribe(NEW_PLANETS_CHANNEL_NAME).await?;
tokio::spawn(async move {
while let Some(msg) = pubsub_con.on_message().next().await {
let payload = msg.get_payload().expect("Can't get payload of message");
let payload: String = FromRedisValue::from_redis_value(&payload)
.expect("Can't convert from Redis value");
let msg = Bytes::from(format!("data: Planet created: {:?}\n\n", payload));
tx.send(Ok(msg))
.await
.expect("Can't send a message to the stream");
}
});
Ok(rx)
}
Подписка используется в Actix обработчике так:
Пример SSE handler
pub async fn sse(
planet_service: web:
) -> Result<HttpResponse, CustomError> {
let new_planets_stream = planet_service.get_new_planets_stream().await?;
let response_stream = tokio_stream::wrappers::ReceiverStream::new(new_planets_stream);
Ok(HttpResponse::build(StatusCode::OK)
.insert_header(header::ContentType(mime::TEXT_EVENT_STREAM))
.streaming(response_stream))
}
Чтобы протестировать нотификации, вам нужно подписаться на события и сгенерировать событие. Далее приведены два подхода для этого; в обоих событие генерируется с использованием cURL:
- подписка из браузера
Перейдите на http://localhost:9000/, где находится HTML страница:
- подписка из командной строки с использованием cURL
Используйте curl -X GET localhost:9000/events:
Для генерации события используется следующий cURL запрос:
Запрос для тестирования нотификаций
curl -X POST -H 'Content-Type: application/json' -d '{
\"name\": \"Pluto\",
\"type\": \"DwarfPlanet\",
\"mean_radius\": 1188,
\"satellites\": null
}' localhost:9000/planets
Веб приложение
Некоторые аспекты этой темы были включены в предыдущие разделы, поэтому здесь будут освещены некоторые из оставшихся тем.
REST API обработчики
REST API обработчики определены так:
Определение REST API обработчиков
#[actix_web::main]
async fn main() -> std:

...
let enable_write_handlers = env::var("ENABLE_WRITE_HANDLERS")
.expect("ENABLE_WRITE_HANDLERS env var should be specified")
.parse::<bool>()
.expect("Can't parse ENABLE_WRITE_HANDLERS");
HttpServer::new(move || {
let mut app = App::new()
.route("/planets", web::get().to(handlers::get_planets))
.route("/planets/{planet_id}", web::get().to(handlers::get_planet))
.route(
"/planets/{planet_id}/image",
web::get().to(handlers::get_image_of_planet),
)
.route("/events", web::get().to(handlers::sse))
.route("/", web::get().to(handlers::index))
.data(Arc::clone(&planet_service))
.data(Arc::clone(&rate_limiting_service));
if enable_write_handlers {
app = app
.route("/planets", web:
.route(
"/planets/{planet_id}",
web:
)
.route(
"/planets/{planet_id}",
web::delete().to(handlers::delete_planet),
);
}
app
})
.bind("0.0.0.0:9000")?
.run()
.await
}
Обработка ошибок
Обработка ошибок имплементирована в соответствии с документацией.
Запуск и тестирование
Локально проект может быть запущен двумя способами:
- с использованием Docker Compose (docker-compose.yml):
docker compose up (или docker-compose up в более старых версиях Docker) - без использования Docker
Запустите приложение с помощью cargo run (в этом случае сервис mongodb-redis в docker-compose.yml должен быть отключён)
CI/CD
CI/CD сконфигурировано с помощью GitHub Actions workflow, который собирает Docker образ приложения и разворачивает его на Google Cloud Platform.
Для доступа к REST API развёрнутого приложения вы можете использовать один из доступных GET эндпоинтов, например:
GET http://demo.romankudryashov.com:9000/planets
Пишущие методы REST API недоступны на production среде.
Заключение
В этой статье я показал как начать работу с MongoDB и Redis и примеры их использования в Rust приложении. Не стесняйтесь написать мне, если нашли какие-либо ошибки в статье или исходном коде. Спасибо за внимание!
Источник статьи: https://habr.com/ru/post/568856/