Реализация Peer-to-Peer сетей на Rust: создание протокола обмена файлами с помощью libp2p

Kate

Administrator
Команда форума
Сегодня я расскажу, как на Rust создать собственный протокол обмена файлами, используя библиотеку libp2p.

Создаем проект​

Начнем с самого начала. Открываем терминал и выполняем следующие команды:

cargo new p2p-file-exchange
cd p2p-file-exchange

Команды создадут новый проект на Rust с именем p2p-file-exchange и переключат текущий каталог на него.

После открываем Cargo.toml и добавляем следующие зависимости:

[dependencies]
libp2p = { version = "0.51", features = ["tcp-tokio", "dns", "noise", "mplex", "identify"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
async-trait = "0.1"
futures = "0.3"

  • libp2p: основная библиотека для создания P2P-сетей.
  • tokio: асинхронный рантайм для Rust.
  • serde и serde_json: для сериализации и десериализации данных.
  • async-trait: позволяет использовать асинхронные трейты.
  • futures: для работы с асинхронными потоками.

Основная структура​

Теперь перейдём к src/main.rs. Начнем с импорта необходимых модулей:

use libp2p::{
identity,
PeerId,
Swarm,
Multiaddr,
Transport,
noise,
tcp::TokioTcpConfig,
mplex,
core::upgrade,
request_response::{
ProtocolName, ProtocolSupport, RequestResponse, RequestResponseConfig,
RequestResponseCodec, RequestResponseEvent, RequestResponseMessage,
RequestResponseProtocol,
},
};
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use std::error::Error;
use futures::prelude::*;
Импортируем основные компоненты из libp2p, а также необходимые библиотеки.

Для обмена файлами нужен простой протокол. Создадим структуры для запросов и ответов:

#[derive(Debug, Serialize, Deserialize)]
struct FileRequest {
filename: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct FileResponse {
content: Vec<u8>,
}
Эти структуры определяют формат запросов и ответов. FileRequest содержит имя файла, который запрашивается, а FileResponse — содержимое этого файла в виде массива байтов.

Теперь определим протокол:

#[derive(Debug, Clone)]
struct FileProtocol();

#[derive(Clone)]
struct FileCodec();

#[derive(Debug, Clone, PartialEq, Eq)]
struct FileProtocolName;

impl ProtocolName for FileProtocolName {
fn protocol_name(&self) -> &[u8] {
b"/p2pfile/1.0.0"
}
}

#[async_trait]
impl RequestResponseCodec for FileCodec {
type Protocol = FileProtocolName;
type Request = FileRequest;
type Response = FileResponse;

async fn read_request<T>(&mut self, _: &FileProtocolName, io: &mut T) -> io::Result<Self::Request>
where
T: async_std:🇮🇴:Read + Unpin,
{
let req: FileRequest = serde_json::from_reader(io)
.map_err(|e| async_std:🇮🇴:Error::new(async_std:🇮🇴:ErrorKind::InvalidData, e))?;
Ok(req)
}

async fn read_response<T>(&mut self, _: &FileProtocolName, io: &mut T) -> io::Result<Self::Response>
where
T: async_std:🇮🇴:Read + Unpin,
{
let res: FileResponse = serde_json::from_reader(io)
.map_err(|e| async_std:🇮🇴:Error::new(async_std:🇮🇴:ErrorKind::InvalidData, e))?;
Ok(res)
}

async fn write_request<T>(&mut self, _: &FileProtocolName, io: &mut T, req: Self::Request) -> io::Result<()>
where
T: async_std:🇮🇴:Write + Unpin,
{
serde_json::to_writer(io, &req)
.map_err(|e| async_std:🇮🇴:Error::new(async_std:🇮🇴:ErrorKind::InvalidData, e))
}

async fn write_response<T>(&mut self, _: &FileProtocolName, io: &mut T, res: Self::Response) -> io::Result<()>
where
T: async_std:🇮🇴:Write + Unpin,
{
serde_json::to_writer(io, &res)
.map_err(|e| async_std:🇮🇴:Error::new(async_std:🇮🇴:ErrorKind::InvalidData, e))
}
}
Здесь определяем протокол обмена файлами. FileProtocolName задает уникальное имя протокола, а FileCodec отвечает за сериализацию и десериализацию запросов и ответов с использованием JSON

Теперь настроим транспорт с шифрованием и мультиплексированием:

fn create_transport(local_key: &identity::Keypair) -> impl Transport<
Output = impl libp2p::swarm::ConnectionHandler,
Error = impl std::error::Error,
> + Clone {
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(local_key)
.expect("Signing libp2p-noise static DH keypair failed.");

TokioTcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(mplex::MplexConfig::new())
.boxed()
}
В этой функции создаем транспортный слой для нашей P2P-сети. Используем TCP с поддержкой Tokio для асинхронного взаимодействия, Noise для шифрования соединений и Mplex для мультиплексирования потоков.

Swarm — это мозг P2P-сети. Он управляет соединениями и обрабатывает события.

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Генерируем ключи и PeerId
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {:?}", local_peer_id);

// Создаём транспорт
let transport = create_transport(&local_key);

// Настраиваем протокол обмена файлами
let protocols = std::iter::eek:nce((
FileProtocolName,
ProtocolSupport::Full,
));
let codec = FileCodec();
let cfg = RequestResponseConfig::default();
let request_response = RequestResponse::new(codec, protocols, cfg);

// Создаём Swarm
let mut swarm = Swarm::new(transport, request_response, local_peer_id);

// Слушаем на всех доступных интерфейсах и портах
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

println!("Swarm запущен. Ожидаем подключения...");

// Основной цикл
loop {
match swarm.next().await.unwrap() {
RequestResponseEvent::Message { peer, message } => {
match message {
RequestResponseMessage::Request { request, channel, .. } => {
println!("Получен запрос на файл: {}", request.filename);
// Здесь должна быть логика поиска и чтения файла
// Для примера отправим заглушку
let content = match std::fs::read(&request.filename) {
Ok(data) => data,
Err(_) => b"Файл не найден".to_vec(),
};
let response = FileResponse { content };
swarm.respond(channel, response)?;
},
RequestResponseMessage::Response { response, .. } => {
println!("Получен ответ с содержимым файла: {} байт", response.content.len());
// Здесь можно обработать полученные данные
},
}
},
RequestResponseEvent::OutboundFailure { peer, error, .. } => {
eprintln!("Ошибка при отправке запроса к {}: {:?}", peer, error);
},
RequestResponseEvent::InboundFailure { peer, error, .. } => {
eprintln!("Ошибка при получении запроса от {}: {:?}", peer, error);
},
RequestResponseEvent::ResponseSent { peer, .. } => {
println!("Ответ отправлен к {}", peer);
},
_ => {}
}
}
}
В этом фрагменте инициализируем наш Swarm, который управляет всеми соединениями и взаимодействиями в сети. Генерируем ключи для нашего узла и получаем его PeerId. Затем настраиваем транспорт с использованием Noise для шифрования и Mplex для мультиплексирования потоков. Также определяем протокол обмена файлами и начинаем прослушивание на всех доступных интерфейсах и портах.

Теперь добавим возможность подключаться к другим узлам. Для этого нам понадобится Multiaddr другого узла. Допустим, есть другой узел, запущенный на локальной машине с портом 8080 и PeerId 12D3KooW.... Добавим его в код:

// Подключаемся к другому узлу
let remote: Multiaddr = "/ip4/127.0.0.1/tcp/8080/p2p/12D3KooW...".parse()?;
swarm.dial(remote)?;
println!("Подключение к {}", remote)?;
В этом фрагменте создаем Multiaddr для удаленного узла и инициируем подключение к нему с помощью swarm.dial(remote). Не забываем заменить 12D3KooW... на актуальный PeerId удаленного узла.

Теперь все готово для запуска. Открываем два терминала, в каждом запускаем проект:

cargo run
В одном терминале узел будет слушать подключения, а в другом подключаться к первому.

Терминал 1 (слушающий узел):

Local peer id: PeerId("12D3KooW...")
Swarm запущен. Ожидаем подключения...
Терминал 2 (подключающий узел):

Local peer id: PeerId("16Uiu2HAm...")
Swarm запущен. Ожидаем подключения...
Подключение к /ip4/127.0.0.1/tcp/8080/p2p/12D3KooW...
Когда подключение установлено, узлы смогут обмениваться запросами и ответами на файлы.

 
Сверху