Безопасная многопоточность в Rust

Kate

Administrator
Команда форума
Представьте ситуацию: вы решили добавить функционал в некоторую сущность (класс, структуру, ...). Для этого вам понадобилось хранить внутри сущности дополнительные данные. Вы добавляете приватные поля, меняете и читаете их внутри методов, интерфейс методов не меняется, и всё выглядит хорошо. И не просто выглядит, а компилируется, запускается и проходит тесты. Красота... А потом, в один прекрасный день, прод падает. И может быть даже, теряет часть важных данных. Вы с коллегами начинаете искать причину поломки. И оказывается, что та сущность, которую вы дополняли год назад, делится между несколькими потоками, а поля, добавленные вами, не потокобезопасны, и поэтому гонка за доступ к ним в какой-то момент вызывает их повреждение. Неприятно, согласитесь? Хотелось бы иметь инструмент, позволяющий избегать подобных ситуаций. И язык программирования Rust такой инструмент предоставляет.

Содержание статьи​

  1. Многопоточность и её проблемы
  2. Безопасная многопоточность в Rust
  3. Заключение

Многопоточность и её проблемы​

Прежде чем вдаваться в подробности механизмов, предоставляемых Rust, давайте сначала немного разберёмся:

  • Что такое многопоточность?
  • Зачем она нужна?
  • Возникающие проблемы.
Думаю, опытный разработчик многопоточных приложений может пролистать эту часть ничего не потеряв. Для тех же, кто сомневается в своих знаниях:

Что такое многопоточность?​

Определение из википедии: "Многопото́чность — свойство платформы, состоящее в том, что процесс, порождённый в ней, может состоять из нескольких потоков, выполняющихся «параллельно», то есть без предписанного порядка во времени." Давайте попробуем получше разобраться с этим на примере:

5d47912816182d0b457ef2e2190b31a6.png

На схеме выше изображены два потока со своими задачами. "Thread 1" с задачами "Task 1,2,3" и "Thread 2" с задачами "Task A,B,C". В рамках одного потока порядок задач задан однозначно:
В "Thread 1" выполняется "Task 1", затем "Task 2", и потом "Task 3".
В "Thread 2" выполняется "Task A", затем "Task B", и потом "Task C".
А вот между задачами "Task 1,2,3" и "Task A,B,C" порядок не установлен. Это значит, что задача "Task A", например может выполниться до начала выполнения задачи "Task 1", а может начать выполняться после "Task 3". Более того, "Task 1" может выполняться одновременно (параллельно) с любой из задач "Task A,B,C". При каждом выполнении кода, соответствующего схеме, порядок выполнения может быть разным. Такое поведение имеет свои преимущества и недостатки.

Преимущества многопоточности​

Основное преимущество многопоточности - возможность повышения быстродействия программы. Вынося операцию ввода/вывода в отдельный поток, вы можете, не дожидаясь её завершения, выполнять другие задачи. Также, распределение вычислений на несколько потоков позволяет использовать в своих целях больше ресурсов многоядерного ЦПУ.

Помимо этого, иногда требуется выполнить действие, процесс выполнения и результат которого не связаны с текущим потоком выполнения. Не хотелось бы заботиться о том, как внедрить его в текущую логику выполнения, или о том, когда это действие будет выполняться. В таком случае, выполнение действия в отдельном потоке - отличный инструмент для упрощения логики программы.

Недостатки многопоточности​

Основной недостаток - появление новых классов ошибок, которые легко допустить и сложно детектировать. Давайте поговорим о них подробнее.

Взаимная блокировка​

d84ce14ef20b4a1013925fcb016e44c3.jpg

Чаще встречается в английском варианте: deadlock. Это ситуация, при которой несколько потоков находятся в состоянии ожидания ресурсов, занятых друг другом, и ни один из них не может продолжать свое выполнение. Проще понять суть определения на простом примере.

Допустим есть два потока: П1 и П2, и два ресурса: А и Б. Оба потока хотят эксклюзивно завладеть обоими ресурсами. П1 захватывает А, и одновременно с этим П2 захватывает Б. Далее, П1 ожидает освобождения Б, а П2 ожидает освобождения А. Так как ни один из них при этом не собирается освобождать захваченных ресурс, ожидать они будут бесконечно. Программа зависнет.

Если постараться, то можно получить deadlock и в однопоточном приложении. Для этого поток должен попытаться захватить ресурс, которым уже владеет. Но такое случается редко и отлаживается просто, так как воспроизводится при каждом запуске. В многопоточном приложении дела обстоят значительно хуже. Проблема может выстрелить один раз из, скажем, миллиарда. И этот раз, по закону подлости, обязательно будет в самый неудобный момент.

Но в многопоточных приложениях есть класс ошибок, которые допускаются намного чаще. Ведь для получения такой ошибки достаточно разделить между двумя потоками изменяемые данные. Как многие уже догадались, речь идёт о...

Состояние гонки​

d4b58aaf3b3f41f24dc50b430d115dc5.png

По-английски: race condition. Это состояние, при котором работа многопоточной системы зависит от порядка выполнения кода в разных потоках. Как мы отмечали выше, порядок выполнения кода в разных потоках не детерминирован, что делает случайным результат выполнения программы. Давайте рассмотрим состояние гонки на примере.

Пусть у нас в приложении есть два потока П1 и П2. Предположим, что в П1 у нас формируются некоторые текстовые команды, а поток П2 копирует их и выполняет. И вот, в определённый момент, команда демонстрации базы данных "show database" меняется посимвольно на команду сброса мусора "drop garbage". Есть вероятность, что в момент, когда будут заменены 4-5 символов, поток П2 скопирует текущую команду на исполнение. И исполнив "drop database" уничтожит базу данных вашего приложения.

Обратите внимание на фразу "есть вероятность". Это означает, что такая ошибка, скорее всего, не будет отловлена на этапе отладки и тестирования. И это утверждение относится не только к данному примеру, но и, практически к любому другому способу получения гонки данных. А таких способов очень много.

Не подумали о потокобезопасности поля, добавляемого в структуру, доступ к которой имеют несколько потоков? Получили состояние гонки.

Переданный нами коллбэк, неожиданно, вызывается в другом потоке? Потенциальное состояние гонки.

Увеличиваем из нескольких потоков не атомарный счётчик? Когда-нибудь, после 10 увеличений, счётчик окажется равен 8.

Делим данные со сторонней библиотекой? Потенциальное состояние гонки, если нет гарантий, что библиотека не делит данные между другими потоками.

И такие примеры можно приводить бесконечно. Обеспечение надёжности многопоточного приложения требует от программиста держать в голове большое количество информации и не упускать из виду множество факторов. Хорошо бы иметь инструмент, который сам будет следить за корректностью многопоточного кода...

Безопасная многопоточность в Rust​

Давайте попробуем добиться гонки данных в Rust (без unsafe).

Примитивная попытка​

Для начала, самый очевидный вариант: создадим переменную, передадим её в другой поток, а в текущем потоке поменяем.

fn main() {
let mut s = String::from("My text");
println!("{} in main", s);
print_in_other_thread(s);
s.push('!');
println!("{} in main again", s);
}

fn print_in_other_thread(s: String) {
std:🧵:spawn(move || {
println!("{} in other thread", s);
});
}
Ссылка на playground

Компилятор не соберёт данный код с сообщением о том, что в 5-ой строке мы пытаемся одолжить (borrow) переменную, которую переместили в 4-ой строке. Всё верно, ведь мы нарушили принцип владения (ownership). Согласно ему у любых данных в любой момент времени может быть только один владелец - объект, при выходе из области видимости которого, данные перестанут существовать. В строке 4 мы передали владение строкой s в функцию print_in_other_thread, лишив себя, таким образом, возможности дальше использовать s в функции main.

Хорошо, давайте попробуем не передавать владение, а одолжить нашу строку в функцию print_in_other_thread.

Одолжим строку​

Вместо передачи самой строки, передадим ссылку на неё:

fn main() {
let mut s = String::from("My text");
println!("{} in main", s);
print_in_other_thread(&s);
s.push('!');
println!("{} in main again", s);
}

fn print_in_other_thread(s: &String) {
std:🧵:spawn(move || {
println!("{} in other thread", s);
});
}
Ссылка на playground

Теперь компилятор ругается на то, что мы передаём в функцию порождения нового потока std:🧵:spawn() замыкание, содержащее ссылку с не `static временем жизни. Время жизни `static означает, что ссылка живёт на протяжении жизни всей программы. Это верно, например для глобальных переменных. В нашем случае, передавая ссылку на локальную переменную в другой поток, есть шанс, что переменная и её данные в памяти очистятся ещё до того, как эта переменная будет использована в другом потоке. Поэтому, для функции порождения нового потока логично требовать `static время жизни для всех захваченных ссылок. Благо, синтаксис Rust позволяет выражать подобные идеи.

И снова мы потерпели фиаско. Как же нам разделить владение строкой? Давайте воспользуемся контейнером, предназначенным как раз для таких случаев.

Делим указатель на строку через Arc​

Arc (Atomically Reference Counted), так называемый, умный указатель. Он содержит указатель на данные и счётчик своих копий. Данные удаляются только при уничтожении последней копии.

Обернём строку в Arc и посмотрим, что из этого получится:

use std::sync::Arc;

fn main() {
let mut s = Arc::new(String::from("My text"));
println!("{} in main", s);
print_in_other_thread(s.clone());
s.push('!');
println!("{} in main again", s);
std:🧵:sleep(std::time::Duration::from_secs(1));
}

fn print_in_other_thread(s: Arc<String>) {
std:🧵:spawn(move || {
println!("{} in other thread", s);
});
}
Ссылка на playground

На этот раз компилятор ругается на строку 7. Он сообщает, что мы пытаемся использовать мутабельную ссылку на строку, в то время, как Arc позволяет получать только иммутабельную ссылку на своё содержимое. И снова компилятор не позволил нам получить состояние гонки. Но мы так просто не сдадимся и пойдём дальше. Ведь нам известно, что в Rust есть возможность менять данные по иммутабельной ссылке.

Меняем иммутабельные данные​

Для того чтобы значение переменной можно было изменить, имея только иммутабельную ссылку на неё, в Rust предусмотрена концепция внутренней мутабельности (interior mutability). Для применения этой концепции можно обернуть данные в контейнер Cell. На этот раз я буду использовать число (тип i32) вместо строки в качестве данных. Это позволит сделать код лаконичнее, не повлияв на суть примера.

use std::sync::Arc;
use std::cell::Cell;

fn main() {
let mut s = Arc::new(Cell::new(5));
println!("{:?} in main", s);
print_in_other_thread(s.clone());
s.set(42);
println!("{:?} in main again", s);
std:🧵:sleep(std::time::Duration::from_secs(1));
}

fn print_in_other_thread(s: Arc<Cell<i32>>) {
std:🧵:spawn(move || {
println!("{:?} in other thread", s);
});
}
Ссылка на playground

Метод set() у Cell вызывается по иммутабельной ссылке, но, несмотря на это, меняет содержимое ячейки. Казалось бы, это то что нам нужно для достижения гонки данных. Arc позволяет получить иммутабельный доступ к своему содержимому, а Cell позволяет иммутабельное содержимое менять. Но, не тут-то было. Разработчики позаботились о такой ситуации. Что подтверждается ошибкой компиляции, в которой сообщается, что:
Cell<i32> не может быть безопасно разделена между потоками;
так как не реализует трейт Sync;
который требуется для того, чтобы Arc<Cell<i32>> реализовывал Send;
что необходимо для того, чтобы замыкание, переданное в std:🧵:spawn(), реализовывало трейт Send, запрашиваемый этой функцией.

О трейтах
Трейты схожи с концепцией интерфейсов в других языках. Их можно реализовывать на типах, расширяя их функционал. Также, функции могут накладывать ограничение на трейты принимаемаемых аргументов. Ограничения проверяются при компиляции.
Давайте разберёмся подробнее. Посмотрим на сигнатуру функции std:🧵:spawn():

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Из сигнатуры видно, что функция принимает generic функтор типа F, возвращающий объект типа T. На функтор задано ограничение: он должен реализовывать трейт Send и иметь `static время жизни. Об ограничении на время жизни мы говорили выше, когда пытались передать ссылку в другой поток. Сейчас рассмотрим трейты Send и Sync.

Семантика трейта Send следущая: тип можно передать в другой поток только когда он реализует Send. Поэтому, например, функция std:🧵:spawn(), исполняющая принятый функтор в другом потоке, требует реализацию Send.

Реализация трейта Sync на типе говорит о том, что ссылку на этот тип можно делить между потоками. Иными словами, тип является Sync тогда и только тогда, когда ссылка на этот тип является Send. Например, Arc представляет собой умную ссылку и поэтому реализует Send только для Sync типов:

unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
При этом трейты Send и Sync не совсем обычные. Компилятор умеет выводить их автоматически. При этом он следует простому правилу: если все данные в типе реализуют Send, то и сам тип - Send. С Sync аналогично. Для того, чтобы реализовать эти трейты на типе самостоятельно, нужно уведомить компилятор, что мы знаем, что делаем. Для этого используется ключевое слово unsafe.

Давайте попробуем теперь ещё раз разобраться с ошибкой компиляции последнего примера:

error[E0277]: `Cell<i32>` cannot be shared between threads safely
--> src/main.rs:14:5
|
14 | std:🧵:spawn(move || {
| ^^^^^^^^^^^^^^^^^^ `Cell<i32>` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `Cell<i32>`
= note: required because of the requirements on the impl of `Send` for `Arc<Cell<i32>>`
= note: required because it appears within the type `[closure@src/main.rs:14:24: 16:6]`
Итак, std:🧵:spawn() требует замыкание, реализующее Send. В нашем случае это не верно, ведь замыкание - это просто безымянная структура, поля которой - это захваченные данные. В нашем случае захвачен Arc<Cell<i32>>. Так как он не реализует Send, то и замыкание тоже не Send. Об этом нам подсказывает компилятор (предпоследняя строка).

Теперь о том, почему Arc<Cell<i32>> не Send. Тут всё просто: причина в том, что Cell<i32> не Sync. О чём компилятор, также, уведомляет (the trait `Sync` is not implemented for `Cell<i32>`). И как раз то, что Cell не реализует Sync, позволяет компилятору запрещать нам создавать состояние гонки, разделяя ссылки на Cell между потоками.

Таким образом, трейты Send и Sync позволяют описывать и проверять потокобезопасность типов на этапе компиляции. А за счёт автовывода этих трейтов, компилятор не позволит нам поместить непотокобезопасные данные внутрь разделённых на несколько потоков типов. Ведь для этих типов не будет выведен трейт, позволяющий делить их между потоками, и мы получим ошибку при компиляции.

Теперь попробуем получить deadlock.

Получаем deadlock​

Как и обсуждали в описании deadlock, для его получения захватим два ресурса в двух потоках в разном порядке:

use std::sync::Arc;
use std::sync::Mutex;

fn main() {
let r1 = Arc::new(Mutex::new(1));
let r2 = Arc::new(Mutex::new(2));

let r1_clone = r1.clone();
let r2_clone = r2.clone();

std:🧵:spawn(move || {
println!("Lock r2 in other thread");
let _guard2 = r2_clone.lock().unwrap();

println!("Wait second in other thread");
std:🧵:sleep(std::time::Duration::from_secs(1));

println!("Lock r1 in other thread");
let _guard1 = r1_clone.lock().unwrap();

println!("Other thread finished");
});

println!("Lock r1 in main thread");
let _guard1 = r1.lock().unwrap();

println!("Wait second in main thread");
std:🧵:sleep(std::time::Duration::from_secs(1));

println!("Lock r2 in main thread");
let _guard2 = r2.lock().unwrap();

println!("Main thread finished");
}
Ссылка на playground

Ни один из потоков не завершает своего выполнения. Deadlock. К сожалению, Rust никак не защищает от взаимной блокировки. О ней приходится заботиться самостоятельно.

Заключение​

Итак, мы обсудили многопоточность. Она даёт нам возможность использовать вычислительные ресурсы более эффективно, но при этом открывает доступ к двум классам ошибок: состояние гонки и взаимная блокировка.

Компилятор Rust (если не использовать unsafe) не разрешит собрать программу, содержащую состояние гонки. Это значительно снижает когнитивную нагрузку на программиста, и, как заявляют создатели Rust, позволяет использовать многопоточность без страха.

К сожалению, взаимная блокировка в Rust никак не контролируется. Ответственность за её отсутствие остаётся на плечах разработчика.

 
Сверху