Как мы переносили базу Clickhouse между географически удаленными дата-центрами

Kate

Administrator
Команда форума
Наш сервис функционирует 24/7, т.е. просто выключить и перенести все копированием нельзя. Мы спланировали переезд, но из-за некоторых ограничений самого Clickhouse (а точнее его схемы репликации) не смогли реализовать его в соответствии с планом.

Изначально мы предполагали, что сможем к каждой шарде в старом дата-центре подключить реплику из нового, подождем синхронизации, ну а потом просто отключим реплики в старом дата-центре. Но из-за географической удаленности мы получили слишком большую задержку передачи данных между дата-центрами, несмотря на скорость в 2-2,5 Гбита. В результате в ZooKeeper, который координировал репликацию, объем данных вырос на порядок. Так что процесс пришлось остановить, иначе он грозил затормозить продакшн. Пришлось подыскивать другие способы переезда.

В базе сохраняются все запросы, приходящие в наш сервис. В Clickhouse мы храним два типа данных:

  • “Сырые данные” за две недели. За это время накапливается около 6 Тб.
  • “Агрегаты” - важные для нас результаты обработки сырых данных. Агрегаты занимают порядка 300 Гб.
Нашим первым решением стало дождаться процесса агрегации данных и перенести в новый дата-центр только их; ну а пока это происходит, запускать новые шарды и переносить сервисы. Так что задача свелась к тому, что необходимо было найти способ перенести данные и не потерять ни байта из тех 300 Гб агрегатов.

Перечень возможных подходов мы нашли в статье от Альтинити: https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-data-migration/. В нашем случае было два варианта дальнейших действий: физический перенос или копирование с помощью clickhouse-copier.

Далее поговорим про каждый из вариантов.

Физический перенос данных​

Первый возможный вариант - низкоуровневый (физический) перенос данных.

Clickhouse хранит данные в виде Parts, которые можно физически скопировать с одного сервера на другой в виде файлов. Для этого на исходном сервере надо выполнить подобный запрос и отцепить все Parts последовательно для всех таблиц:

#DETACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> DETACH PARTITION|PART <PARTITION_EXPRESSION>
После чего скопировать файлы из директорий ClickHouse_data_dir/<DATABASE_NAME>/<TABLE_NAME>/detached на новый сервер, а потом выполнить обратный запрос:

#ATTACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> ATTACH PARTITION|PART <PARTITION_EXPRESSION>
Мы тестировали этот вариант, но в одном из экспериментов у нас не сошлось количество столбцов в таблицах на исходном и конечном серверах. Возможно, что-то некорректно смерджилось. Мы не стали разбираться, а тем более рисковать на проде, и решили использовать более безопасный метод.

Копирование с помощью clickhouse-copier​

Второй вариант - копирование на более высоком уровне с помощью утилиты от Clickhouse. В БД на источнике выполняется SELECT, а затем вставляется INSERT-ом с другой стороны. При этом очередь всех своих заданий и прогресс утилита хранит в zookeeper, что исключает проблемы.

В открытых источниках информации по этому пути оказалось крайне мало. Пришлось разбираться с нуля.

Способ показался более надежным, хотя и не без особенностей. Например, оказалось, что если запустить утилиту на стороне источника, то данные в результате копирования не сходятся. Возможно, виноваты были сетевые проблемы - было ощущение, что некоторые фрагменты данных просто не достигают нового дата-центра и утилита это не фиксирует. А вот если запустить ее на стороне приемника, то данные сходились на 100%.

Сам процесс переноса данных одной шарды занимал 3-4 часа, еще около получаса требовалось на различные сопутствующие манипуляции, в частности копирование уже внутри сервера, т.к. данные мы изначально переносили во “временную” таблицу. Выполнить копирование напрямую в продакшн-таблицу мы не могли. Из-за того, что в процессе копирования кластер фактически состоит из машин в двух дата-центрах, в этот момент у нас происходило бы задвоение статистики. Так что мы копировали данные из Майами во временную таблицу в Детройте (преодолевали географическое расстояние), а затем уже внутри дата-центра в Детройте проводили слияние с продакшеном, выполняя вставки по 500-600 млн. столбцов.

Несмотря на предосторожности, мы все же столкнулись с парой инцидентов, когда клиенты вышли за установленные в админке лимиты, поскольку во время копирования у них отображалась неполная статистика. Но суммарно убытки были долларов 20.

Копирование на практике​

Процесс копирования выглядит следующим образом.

Для начала работы нам нужно положить в zookeeper данные для утилиты.

zkCli.sh -server localhost:2181 create /clickhouse/copytasks ""
Далее нам нужно создать схему копирования - файл schema.xml.

<clickhouse>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>IP</host>
<port>9000</port>
<user>default</user>
<password></password>
</replica>
</shard>
</source_cluster>

<destination_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>IP</host>
<port>9000</port>
<user>default</user>
<password></password>
</replica>
</shard>
</destination_cluster>
</remote_servers>

<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>1</max_workers>

<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>

<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>

<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>

<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->

<table_hits>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>database</database_pull>
<table_pull>table_local</table_pull>
<cluster_push>destination_cluster</cluster_push>
<database_push>database</database_push>
<table_push>table_local1</table_push>
<engine>
ENGINE=ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ssp_report_common1', '{replica}')
partition by toYYYYMMDD(sspRequestDate)
order by (sspRequestDate, dspId, sspRequestCountry, endpointId)
</engine>
<sharding_key>rand()</sharding_key>
</table_hits>

</tables>
</clickhouse>

Здесь:

  • секция clickhouse описывает сервера - откуда и куда мы производим копирование;
  • секция tables описывает какие таблицы мы копируем;
  • в table_hits находится само описание процесса.
После этого отправляем этот файл в zookeeper:

zkCli.sh -server localhost:2181 create /clickhouse/copytasks/description "`cat schema.xml`"
Далее переходим на сервер Clickhouse и создаем файл zookeeper.xml, необходимый для работы утилиты:

<clickhouse>
<logger>
<level>trace</level>
<size>100M</size>
<count>3</count>
</logger>

<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</clickhouse>
Ну и запускаем командой:

clickhouse-copier --config-file=zookeeper.xml --task-path=/clickhouse/copytasks
После того как утилита отработает стоит удаляем данные из zookeeper:

zkCli.sh -server localhost:2181 deleteall /clickhouse/copytasks
После того, как мы проверили, что все данные совпали, остается выполнить SQL команду:

INSERT INTO table_local SELECT * FROM table_local1;

В общей сложности переезд занял дней 10 (с учетом того, что параллельно мы переносили другие части сервиса). Надеемся, что эта статья поможет вам сэкономить время на поиске подхода к решению подобных задач.


 
Сверху