Цель
У нас есть краулер, который обкачивает страницы. Это может быть поисковый бот Google, который ходит по сайтам, скачивает данные, кладет в базу и индексирует, или какой-нибудь агрегатор: аптек, маркетплейсов и т.д.Задача в том, что краулер должен работать и не положить сервис, который он обкачивает.
Код для начала работы:
import asyncio
from dataclasses import dataclass
from typing import Optional
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
async def start(pool):
await asyncio.sleep(5)
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
except KeyboardInterrupt:
loop.close()
if __name__ == '__main__':
main()
Краулеру нужно посетить и скачать много страниц, следовательно, много раз обратиться к ресурсу. Мы можем позволить себе отправлять много запросов, но сервис, на который мы приходим, может не выдержать большой нагрузки. Поэтому к источнику данных нужно ходить управляемо — сделать rate-limit.
Если в какой-то момент задача прервалась, или мы сами решили остановить краулер, нужно сделать корректную и аккуратную остановку работы. Для этого начатые задачи должны завершиться, а новые задачи из очереди должны прекратить поступать.
Исходный код
У нас есть сущность Pool. Эта сущность умеет управлять количеством запросов в единицу времени. Pool принимает:- max_rate — максимальное количество запросов
- interval — интервал. Если мы передаем значения max_rate = 5 и interval = 1, в секунду может исполняться 5 запросов
- concurrent_level — обозначает допустимое количество параллельных запросов
Планировщик
Для начала нужно написать что-то, что позволит делать ровно 5 запросов в секунду, не обращая внимание на время запроса. Для этого мы запустим планировщик, который назовем scheduler. Он будет просыпаться раз в секунду и ставить количество задач, равное max_rate. Планировщик не ждет их исполнения, просто создает 5 задач каждую секунду.Дополним class Pool и напишем функцию scheduler:
from task import Task
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
pass
Обратите внимание на две вещи:
- функция бесконечная, пока работает наш краулер
- раз в период функция выполняет max_rate раз какое-то действие
Задача для краулера
Scheduler должен откуда-то взять задачи, которые нужно запланировать. Для этого нам нужно сделать очередь, которую мы возьмем из библиотеки asyncio. Примитив называется asyncio.Queue(). В class Pool дописываем:class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
Теперь мы просыпаемся раз в интервал и получаем количество задач, равное max_rate. Но нужно что-то сделать, чтобы они исполнялись.
Для этого в asyncio есть функция create_task. Она запускает выполнение корутины, но при этом не дожидается ее исполнения, а создает фоновую задачу. В create_task передадим метод perform.
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(task.perform))
await asyncio.sleep(self.interval)
Пробный запуск
Давайте попробуем все это запустить. Сделаем функцию start и таким же образом запустим scheduler. Нам нужно не ждать его, а просто запустить в фоне корутину с помощью create_task:async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
def start(self):
self.is_running = True
asyncio.create_task(self._scheduler())
В будущем для корректного завершения работы краулера нужно завершить работу scheduler. Для этого нужно вызвать cancel у задачи, поэтому возвращаемое значение из create_task мы сохраняем в переменную scheduler_task:
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
self._scheduler_task: Optional[asyncio.Task] = None
Выставим rate-limit на 3 и внутри start запустим наш Pool:
def start(self):
self.is_running = True
self._scheduler_task = asyncio.create_task(self._scheduler())
async def start(pool):
pool = Pool(3)
pool.start()
await asyncio.sleep(5)
Запускаем и видим, что ничего не произошло:
Это потому, что внутри очереди ничего нет. Мы сделали старт и поспали 5 секунд, а на момент окончания задачи у нас осталась фоновая задача scheduler.
Промежуточный итог
- У нас есть Pool с параметрами:
— ограничение количества запросов max_rate
— интервал активизации планировщика interval
— максимальное количество параллельных запросов concurrent_level - Мы написали планировщик scheduler, который работает постоянно, просыпается раз в объявленный интервал, достает из очереди max_rate задач и запускает их исполнение.
- Задача task — просто дата-класс с функцией perform. Для описания поведения задачи нужно создать класс-наследник и в нем переопределить perform.
- Еще мы написали функцию start, в которой выставили признак работы is_running и в фоне запустили наш планировщик.
Функции put и join
Перед тем, как запустить Pool, попробуем положить туда задачку. Для этого напишем функцию put, которая принимает задачу и кладет ее в нашу внутреннюю очередь.Дополнительно добавим tid (task_id) и print в код задачи:
import asyncio
from dataclasses import dataclass
@dataclass
class Task:
tid: int
async def perform(self, pool):
print('start perform', self.tid)
await asyncio.sleep(3)
print('complete perform', self.tid)
И добавим 10 задач перед стартом pool:
async def start(pool):
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await asyncio.sleep(5)
Добавим еще кое-что. У стандартной библиотеки queue есть метод join. Тогда краулер будет ждать не 5 секунд, как мы указали в начале, а до тех пор, пока очередь не опустеет:
async def start(pool):
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await pool.join()
Запустим и посмотрим, что произойдет:
Хотя все зависло, планировщик работал.
Вы можете увидеть, что задача выполняется 3 секунды. И, несмотря на то, что предыдущие задачи еще не завершились, планировщик все равно создает новые. Это плохо, потому что если API отвечает медленнее, чем мы шлем к нему запросы, есть вероятность «положить» сервис. Эту проблему мы решим чуть позже.
Чтобы join отработал, нужно помечать задачи выполненными. Не будем усложнять код scheduler и сделаем отдельную функцию _worker. В нее перенесем perform и ниже добавим self._queue.task_done(). Это означает, что задачу мы выполнили:
async def _worker(self, task: Task):
await task.perform(self)
self._queue.task_done()
Обратите внимание, что _worker вызывается без await, потому что scheduler не должен ждать его завершения. Иначе он не успеет запланировать задачи.
В scheduler вместо perform нужно передать _worker и task:
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
Снова попробуем запустить:
Программа завершилась, но осталось предупреждение о том, что scheduler остался работать в фоне. Функцию stop напишем чуть позже.
Semaphore
На этом этапе видим, что:- метод start запускает наш Pool и планировщик scheduler
- планировщик раз в секунду ставит новые задачи и запускает _worker
- _worker эти задачи выполняет
- метод join ждет, пока очередь не станет пустой
В таком случае количество параллельных запросов к сервису за interval будет больше rate_limit. Поэтому нужно ограничить количество параллельных запросов. Для этого нам потребуется переменная concurrent_level, которая по умолчанию равна rate_limit.
В asyncio есть примитив синхронизации Semaphore. С его помощью можно ограничить количество параллельных исполняемых worker. Если количество запланированных задач больше заданного значения, мы ждем их исполнения. В нашем примере задач 3.
Объявим Semaphore и передадим в него либо concurrent_level, либо max_rate.
Когда worker начинает исполняться, нам нужно занять Semaphore. Для этого используем «асинхронный контекстный менеджер»: async with self._sem. Мы занимаем Semaphore, пока не закончатся операции ниже — await task.perform(self) и self._queue.task_done().
async def _worker(self, task: Task):
async with self._sem:
await task.perform(self)
self._queue.task_done()
Добавим Semaphore внутрь scheduler, чтобы scheduler не запускал новые worker'ы, если количество параллельных worker'ов уже достигло максимума:
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
async with self._sem:
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
Запускаем:
Мы добавили 3 задачи и ждем, пока они исполнятся. Таким образом мы соблюдаем максимальное параллельное количество запросов.
Остановка фонового планировщика
У нас осталась проблема с корректным завершением планировщика. После завершения остановки краулера появляется предупреждение о незавершенной корутине.Чтобы этого не было, напишем функцию stop:
async def stop(self):
self.is_running = False
self._scheduler_task.cancel()
Теперь после того, как внутри пула закончатся задачи, его нужно корректно остановить. Добавим метод stop в конце функции start:
async def start():
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await pool.join()
await pool.stop()
Попробуем:
Теперь все работает корректно.
Мы остановили планировщик, когда задачи в очереди закончились. Но если мы остановим краулер в процессе работы, начнут появляться предупреждения о том, что какая-то задача не завершилась:
А чем больше время выполнения perform, тем больше будет таких уведомлений.
Поэтому нам нужно ожидать, когда все worker завершатся. Для этого введем дополнительную переменную, обозначающую количество параллельно работающих worker: concurrent_workers. Изначально она равна 0. При запуске воркера мы увеличиваем concurrent_workers на 1. При выходе, наоборот, уменьшаем на 1:
async def _worker(self, task: FetchTask):
async with self._sem:
self._cuncurrent_workers += 1
await task.perform(self)
self._queue.task_done()
self._cuncurrent_workers -= 1
Теперь нужно как-то сказать функции stop, что все параллельные worker завершились. Это произойдет, когда is_running будет false и concurrent_workers станет равной 0.
Для этого есть примитив синхронизации Event. В нашем коде мы добавим его в Pool и назовем stop_event. Это переменная, на которой можно ждать await self._stop_event.wait() до тех пор, пока кто-то не вызовет self._stop_event.set():
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
self._scheduler_task: Optional[asyncio.Task] = None
self._sem = asyncio.Semaphore(concurrent_level or max_rate)
self._cuncurrent_workers = 0
self._stop_event = asyncio.Event()
Если равна, то все worker завершили свою работу, планировщик отменен и не создает новые задачи. В таком случае все компоненты Pool остановлены или завершили свою работу — программу можно завершать.
Но если concurrent_workers не равна 0, нам нужно внутри метода stop подождать событие stop_event:
async def stop(self):
self.is_running = False
self._scheduler_task.cancel()
if self._cuncurrent_workers != 0:
await self._stop_event.wait()
Когда Pool остановлен, последний работающий worker должен отправить уведомление:
async def _worker(self, task: FetchTask):
async with self._sem:
self._cuncurrent_workers += 1
await task.perform(self)
self._queue.task_done()
self._cuncurrent_workers -= 1
if not self.is_running and self._cuncurrent_workers == 0:
self._stop_event.set()
Обновим функцию main, чтобы все корректно работало:
def main():
loop = asyncio.get_event_loop()
pool = Pool(3)
try:
loop.run_until_complete(start(pool))
except KeyboardInterrupt:
loop.run_until_complete(pool.stop())
loop.close()
Теперь все работает. После нажатия Ctrl + C выполняются оставшиеся задачи, и программа завершается:
Работа краулера на примере обкачки нашего блога на Хабре
Мы реализовали механику пула на нашей абстрактной задачке task.Для следующего этапа я подготовил задачу FetchTask.
fetch_task
Внутри функции parcer есть переменная soup, которая объявлена как soup = BeautifulSoup(data, ’lxml’). Дам небольшие пояснения.
BeautifulSoup — парсер для анализа HTML/XML.
lxml — реализация HTML/XML парсера. Из-за GIL мы специально запускаем res внутри функции perform через executor:
async def perform(self, pool):
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as resp:
print(self.url, resp.status)
data = await resp.text()
res: List[FetchTask] = await asyncio.get_running_loop().run_in_executor(
None, self.parser, data
)
for task in res:
await pool.put(task)
GIL — блокировка, которая запрещает параллельные потоки в Python. Но если вы пишите расширение на С, есть возможность «отпустить» GIL.
Парсер lxml написан на С. У себя под капотом он умеет отпускать GIL и выполняться в отдельном потоке. Это относится и к некоторым другим расширениям: https://lxml.de/2.0/FAQ.html#id1
В fetch_task также переопределяем функцию perform, в которой нужно сходить в сеть. Для этого я взял aiohttp client.
В задаче FetchTask мы идем на указанный URL, оттуда получаем данные и запускаем executor для их обработки. Нужно взять все ссылки в документе, перейти на них и тоже обкачать:
def parser(self, data: str) -> List['FetchTask']:
if self.depth + 1 > MAX_DEPTH:
return []
soup = BeautifulSoup(data, 'lxml')
res = []
for link in soup.find_all('a', href=True):
new_url = URL(link['href'])
if new_url.host is None and new_url.path.startswith('/'):
new_url = URL.build(
scheme=self.url.scheme,
host=self.url.host,
path=new_url.path,
query_string=new_url.query_string
)
if new_url in PARSED_URLS:
continue
PARSED_URLS.add(new_url)
res.append(FetchTask(
tid=self.tid,
url=new_url,
depth=self.depth + 1
))
return res
В конце мы добавляем в результат новую задачу и увеличиваем на 1 глубину depth.
Например, когда мы поставили задачку habr.com, глубина была равна 1. Мы скачали этот документ, в котором есть и другие ссылки: блоги Mail.ru, Yandex или KTS. Когда мы стали обкачивать следующие страницы, глубина увеличилась до 2. Этот параметр нужен для ограничения количества обкачиваемых ресурсов, фактически — глубины.
Обратите внимание, что у нас есть список посещенных страничек PARSED_URLS. Так мы не будем дважды посещать одни и те же страницы.
Теперь импортируем задачи в краулер из fetch_task и изменяем start:
async def start(pool):
await pool.put(
FetchTask(URL('https://habr.com/ru/company/kts/blog/'), 1)
)
pool.start()
await pool.join()
await pool.stop()
Выставляем 3 запроса в секунду и смотрим, как наш краулер потихоньку обкачивает Хабр:
Пишем свой Google, или асинхронный краулер с rate limits на Python
Привет! Меня зовут Александр, я руковожу backend-разработкой в КТS . Сегодня расскажу, как написать асинхронный краулер. Такая задача часто встречается на практике, когда нужно реализовать...
habr.com