Пишем свой Google, или асинхронный краулер с rate limits на Python

Kate

Administrator
Команда форума

Цель​

У нас есть краулер, который обкачивает страницы. Это может быть поисковый бот Google, который ходит по сайтам, скачивает данные, кладет в базу и индексирует, или какой-нибудь агрегатор: аптек, маркетплейсов и т.д.

Задача в том, что краулер должен работать и не положить сервис, который он обкачивает.

Код для начала работы:

import asyncio
from dataclasses import dataclass
from typing import Optional

class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level

async def start(pool):
await asyncio.sleep(5)

def main():
loop = asyncio.get_event_loop()

try:
loop.run_until_complete(start())
except KeyboardInterrupt:
loop.close()

if __name__ == '__main__':
main()
Краулеру нужно посетить и скачать много страниц, следовательно, много раз обратиться к ресурсу. Мы можем позволить себе отправлять много запросов, но сервис, на который мы приходим, может не выдержать большой нагрузки. Поэтому к источнику данных нужно ходить управляемо — сделать rate-limit.

Если в какой-то момент задача прервалась, или мы сами решили остановить краулер, нужно сделать корректную и аккуратную остановку работы. Для этого начатые задачи должны завершиться, а новые задачи из очереди должны прекратить поступать.

Исходный код​

У нас есть сущность Pool. Эта сущность умеет управлять количеством запросов в единицу времени. Pool принимает:

  • max_rate — максимальное количество запросов
  • interval — интервал. Если мы передаем значения max_rate = 5 и interval = 1, в секунду может исполняться 5 запросов
  • concurrent_level — обозначает допустимое количество параллельных запросов
max_rate и concurrent_level могут не совпадать, когда время выполнения запроса больше, чем interval. Например, мы делаем 5 запросов в секунду, как заявлено в переменных, но API все равно отвечает медленнее. Чтобы не положить сервис, мы вводим переменную concurrent_level.

Планировщик​

Для начала нужно написать что-то, что позволит делать ровно 5 запросов в секунду, не обращая внимание на время запроса. Для этого мы запустим планировщик, который назовем scheduler. Он будет просыпаться раз в секунду и ставить количество задач, равное max_rate. Планировщик не ждет их исполнения, просто создает 5 задач каждую секунду.

Дополним class Pool и напишем функцию scheduler:

from task import Task

class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False

async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
pass
Обратите внимание на две вещи:

  • функция бесконечная, пока работает наш краулер
  • раз в период функция выполняет max_rate раз какое-то действие

Задача для краулера​

Scheduler должен откуда-то взять задачи, которые нужно запланировать. Для этого нам нужно сделать очередь, которую мы возьмем из библиотеки asyncio. Примитив называется asyncio.Queue(). В class Pool дописываем:

class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
Теперь мы просыпаемся раз в интервал и получаем количество задач, равное max_rate. Но нужно что-то сделать, чтобы они исполнялись.

Для этого в asyncio есть функция create_task. Она запускает выполнение корутины, но при этом не дожидается ее исполнения, а создает фоновую задачу. В create_task передадим метод perform.

async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(task.perform))
await asyncio.sleep(self.interval)

Пробный запуск​

Давайте попробуем все это запустить. Сделаем функцию start и таким же образом запустим scheduler. Нам нужно не ждать его, а просто запустить в фоне корутину с помощью create_task:

async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)

def start(self):
self.is_running = True
asyncio.create_task(self._scheduler())
В будущем для корректного завершения работы краулера нужно завершить работу scheduler. Для этого нужно вызвать cancel у задачи, поэтому возвращаемое значение из create_task мы сохраняем в переменную scheduler_task:

class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
self._scheduler_task: Optional[asyncio.Task] = None
Выставим rate-limit на 3 и внутри start запустим наш Pool:

def start(self):
self.is_running = True
self._scheduler_task = asyncio.create_task(self._scheduler())

async def start(pool):
pool = Pool(3)
pool.start()
await asyncio.sleep(5)


Запускаем и видим, что ничего не произошло:

af332523c7a1e1656a511b7272a33a40.png

Это потому, что внутри очереди ничего нет. Мы сделали старт и поспали 5 секунд, а на момент окончания задачи у нас осталась фоновая задача scheduler.

Промежуточный итог​

  1. У нас есть Pool с параметрами:
    ограничение количества запросов max_rate
    — интервал активизации планировщика interval
    — максимальное количество параллельных запросов concurrent_level
  2. Мы написали планировщик scheduler, который работает постоянно, просыпается раз в объявленный интервал, достает из очереди max_rate задач и запускает их исполнение.
  3. Задача task — просто дата-класс с функцией perform. Для описания поведения задачи нужно создать класс-наследник и в нем переопределить perform.
  4. Еще мы написали функцию start, в которой выставили признак работы is_running и в фоне запустили наш планировщик.

Функции put и join​

Перед тем, как запустить Pool, попробуем положить туда задачку. Для этого напишем функцию put, которая принимает задачу и кладет ее в нашу внутреннюю очередь.

Дополнительно добавим tid (task_id) и print в код задачи:

import asyncio
from dataclasses import dataclass

@dataclass
class Task:
tid: int

async def perform(self, pool):
print('start perform', self.tid)
await asyncio.sleep(3)
print('complete perform', self.tid)
И добавим 10 задач перед стартом pool:

async def start(pool):
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await asyncio.sleep(5)
Добавим еще кое-что. У стандартной библиотеки queue есть метод join. Тогда краулер будет ждать не 5 секунд, как мы указали в начале, а до тех пор, пока очередь не опустеет:

async def start(pool):
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await pool.join()
Запустим и посмотрим, что произойдет:

9012880f61a256f73434d0a6b0b85082.png

Хотя все зависло, планировщик работал.

Вы можете увидеть, что задача выполняется 3 секунды. И, несмотря на то, что предыдущие задачи еще не завершились, планировщик все равно создает новые. Это плохо, потому что если API отвечает медленнее, чем мы шлем к нему запросы, есть вероятность «положить» сервис. Эту проблему мы решим чуть позже.

Чтобы join отработал, нужно помечать задачи выполненными. Не будем усложнять код scheduler и сделаем отдельную функцию _worker. В нее перенесем perform и ниже добавим self._queue.task_done(). Это означает, что задачу мы выполнили:

async def _worker(self, task: Task):
await task.perform(self)
self._queue.task_done()
Обратите внимание, что _worker вызывается без await, потому что scheduler не должен ждать его завершения. Иначе он не успеет запланировать задачи.

В scheduler вместо perform нужно передать _worker и task:

async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
Снова попробуем запустить:

4fbe7cdc9e34e61906ac8e0e0c4911e5.png

Программа завершилась, но осталось предупреждение о том, что scheduler остался работать в фоне. Функцию stop напишем чуть позже.

Semaphore​

На этом этапе видим, что:

  • метод start запускает наш Pool и планировщик scheduler
  • планировщик раз в секунду ставит новые задачи и запускает _worker
  • _worker эти задачи выполняет
  • метод join ждет, пока очередь не станет пустой
Если время выполнения задач больше интервала активизации планировщика (interval), он накидывает дополнительные задачи сверху тех, которые еще не выполнились.

В таком случае количество параллельных запросов к сервису за interval будет больше rate_limit. Поэтому нужно ограничить количество параллельных запросов. Для этого нам потребуется переменная concurrent_level, которая по умолчанию равна rate_limit.

В asyncio есть примитив синхронизации Semaphore. С его помощью можно ограничить количество параллельных исполняемых worker. Если количество запланированных задач больше заданного значения, мы ждем их исполнения. В нашем примере задач 3.

Объявим Semaphore и передадим в него либо concurrent_level, либо max_rate.

Когда worker начинает исполняться, нам нужно занять Semaphore. Для этого используем «асинхронный контекстный менеджер»: async with self._sem. Мы занимаем Semaphore, пока не закончатся операции ниже — await task.perform(self) и self._queue.task_done().

async def _worker(self, task: Task):
async with self._sem:
await task.perform(self)
self._queue.task_done()
Добавим Semaphore внутрь scheduler, чтобы scheduler не запускал новые worker'ы, если количество параллельных worker'ов уже достигло максимума:

async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
async with self._sem:
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
Запускаем:

dcf4edb5630306b9666845df2d35623a.png

Мы добавили 3 задачи и ждем, пока они исполнятся. Таким образом мы соблюдаем максимальное параллельное количество запросов.

Остановка фонового планировщика​

У нас осталась проблема с корректным завершением планировщика. После завершения остановки краулера появляется предупреждение о незавершенной корутине.

Чтобы этого не было, напишем функцию stop:

async def stop(self):
self.is_running = False
self._scheduler_task.cancel()
Теперь после того, как внутри пула закончатся задачи, его нужно корректно остановить. Добавим метод stop в конце функции start:

async def start():
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await pool.join()
await pool.stop()
Попробуем:

b31d654dc0486886ea313bff205f385e.png

Теперь все работает корректно.

Мы остановили планировщик, когда задачи в очереди закончились. Но если мы остановим краулер в процессе работы, начнут появляться предупреждения о том, что какая-то задача не завершилась:

f7e3bd7d0678b93bf3e8de7da68dd03b.png

А чем больше время выполнения perform, тем больше будет таких уведомлений.

Поэтому нам нужно ожидать, когда все worker завершатся. Для этого введем дополнительную переменную, обозначающую количество параллельно работающих worker: concurrent_workers. Изначально она равна 0. При запуске воркера мы увеличиваем concurrent_workers на 1. При выходе, наоборот, уменьшаем на 1:

async def _worker(self, task: FetchTask):
async with self._sem:
self._cuncurrent_workers += 1
await task.perform(self)
self._queue.task_done()
self._cuncurrent_workers -= 1
Теперь нужно как-то сказать функции stop, что все параллельные worker завершились. Это произойдет, когда is_running будет false и concurrent_workers станет равной 0.

Для этого есть примитив синхронизации Event. В нашем коде мы добавим его в Pool и назовем stop_event. Это переменная, на которой можно ждать await self._stop_event.wait() до тех пор, пока кто-то не вызовет self._stop_event.set():

class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
self._scheduler_task: Optional[asyncio.Task] = None
self._sem = asyncio.Semaphore(concurrent_level or max_rate)
self._cuncurrent_workers = 0
self._stop_event = asyncio.Event()
Если равна, то все worker завершили свою работу, планировщик отменен и не создает новые задачи. В таком случае все компоненты Pool остановлены или завершили свою работу — программу можно завершать.

Но если concurrent_workers не равна 0, нам нужно внутри метода stop подождать событие stop_event:

async def stop(self):
self.is_running = False
self._scheduler_task.cancel()
if self._cuncurrent_workers != 0:
await self._stop_event.wait()
Когда Pool остановлен, последний работающий worker должен отправить уведомление:

async def _worker(self, task: FetchTask):
async with self._sem:
self._cuncurrent_workers += 1
await task.perform(self)
self._queue.task_done()
self._cuncurrent_workers -= 1
if not self.is_running and self._cuncurrent_workers == 0:
self._stop_event.set()
Обновим функцию main, чтобы все корректно работало:

def main():
loop = asyncio.get_event_loop()
pool = Pool(3)
try:
loop.run_until_complete(start(pool))
except KeyboardInterrupt:
loop.run_until_complete(pool.stop())
loop.close()
Теперь все работает. После нажатия Ctrl + C выполняются оставшиеся задачи, и программа завершается:

02e3629ddba3d00ff12fa045147b682c.png

Работа краулера на примере обкачки нашего блога на Хабре​

Мы реализовали механику пула на нашей абстрактной задачке task.

Для следующего этапа я подготовил задачу FetchTask.

fetch_task
Внутри функции parcer есть переменная soup, которая объявлена как soup = BeautifulSoup(data, ’lxml’). Дам небольшие пояснения.

BeautifulSoup — парсер для анализа HTML/XML.

lxml — реализация HTML/XML парсера. Из-за GIL мы специально запускаем res внутри функции perform через executor:

async def perform(self, pool):
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as resp:
print(self.url, resp.status)
data = await resp.text()
res: List[FetchTask] = await asyncio.get_running_loop().run_in_executor(
None, self.parser, data
)
for task in res:
await pool.put(task)
GIL — блокировка, которая запрещает параллельные потоки в Python. Но если вы пишите расширение на С, есть возможность «отпустить» GIL.

Парсер lxml написан на С. У себя под капотом он умеет отпускать GIL и выполняться в отдельном потоке. Это относится и к некоторым другим расширениям: https://lxml.de/2.0/FAQ.html#id1

В fetch_task также переопределяем функцию perform, в которой нужно сходить в сеть. Для этого я взял aiohttp client.

В задаче FetchTask мы идем на указанный URL, оттуда получаем данные и запускаем executor для их обработки. Нужно взять все ссылки в документе, перейти на них и тоже обкачать:

def parser(self, data: str) -> List['FetchTask']:
if self.depth + 1 > MAX_DEPTH:
return []
soup = BeautifulSoup(data, 'lxml')
res = []
for link in soup.find_all('a', href=True):
new_url = URL(link['href'])
if new_url.host is None and new_url.path.startswith('/'):
new_url = URL.build(
scheme=self.url.scheme,
host=self.url.host,
path=new_url.path,
query_string=new_url.query_string
)
if new_url in PARSED_URLS:
continue
PARSED_URLS.add(new_url)
res.append(FetchTask(
tid=self.tid,
url=new_url,
depth=self.depth + 1
))
return res
В конце мы добавляем в результат новую задачу и увеличиваем на 1 глубину depth.

Например, когда мы поставили задачку habr.com, глубина была равна 1. Мы скачали этот документ, в котором есть и другие ссылки: блоги Mail.ru, Yandex или KTS. Когда мы стали обкачивать следующие страницы, глубина увеличилась до 2. Этот параметр нужен для ограничения количества обкачиваемых ресурсов, фактически — глубины.

Обратите внимание, что у нас есть список посещенных страничек PARSED_URLS. Так мы не будем дважды посещать одни и те же страницы.

Теперь импортируем задачи в краулер из fetch_task и изменяем start:

async def start(pool):
await pool.put(
FetchTask(URL('https://habr.com/ru/company/kts/blog/'), 1)
)
pool.start()
await pool.join()
await pool.stop()
Выставляем 3 запроса в секунду и смотрим, как наш краулер потихоньку обкачивает Хабр:

1eb4b19a37f38488bdbda86f7e5259e0.png


 
Сверху