Роман с Data Science. Как монетизировать большие данные - Роман Владимирович Зыков 23 стр.


Spark

С фреймворком Spark я познакомился в 2012 году, когда приобрел для корпоративной библиотеки Ostrovok.ru видеозаписи конференции по анализу данных Strata. Эту конференцию организует издательство OReilly в США. На одной из лекций я увидел, как Матей Захария (основной автор Spark) рассказывает о преимуществах Spark над чистой реализацией MapReduce на Hadoop. Самое главное преимущество в том, что Spark загружает данные в память, в так называемые отказоустойчивые распределенные датасеты RDD (resilient distributed dataset), и позволяет работать с ними в памяти итеративно. Чистый Hadoop же полагается на дисковую память для каждой пары операций MapReduce-данные читаются с диска, затем сохраняются. Если алгоритм требует применения еще нескольких операций, то для каждой из них придется читать данные с диска и сохранять обратно. Spark же, совершив первую операцию, сохраняет данные в памяти, и последующие операции MapReduce будут работать с этим массивом, пока программа не прикажет явно сохранить их на диск. Это очень важно для задач машинного обучения, где используются итеративные алгоритмы поиска оптимального решения. Все это дало огромный прирост производительности, иногда в 100 раз быстрее классического Hadoop.

Эти идеи из лекции Матея Захарии мне пригодились через несколько лет, когда мы стали писать вторую версию рекомендательного движка Retail Rocket. Тогда как раз вышла версия 1.0.0. Летом 2014 года мы попробовали запустить Spark поверх Hadoop и все получилось. В экспериментах мы достигли 34-кратного ускорения. Была, правда, проблема с производительностью на большом количестве мелких файлов. Мы написали небольшую библиотеку, которая склеивала их при загрузке, и проблема решилась [41]. Сейчас мы по-прежнему используем Spark на нашем Hadoop-кластере и ни о чем не жалеем.

При переходе с чистого Hadoop на «Spark поверх Hadoop» пришлось отказаться от большого количества кода на языке Pig, Hive, Java, Python весь этот зоопарк доставлял большую головную боль, ведь их все должны были знать. Для прототипирования задач машинного обучения мы использовали связку питоновских инструментов: IPython + Pyhs2 (hive-драйвер Python) + Pandas + Sklearn. Со Spark мы смогли начать пользоваться только одним языком программирования для прототипирования экспериментальных функций и для рабочего варианта. Это было очень большое достижение для нашей небольшой команды.

Spark поддерживает четыре языка программирования из коробки Python/Scala/Java/R через API. Сам Spark написан на Scala, поэтому мы выбрали его. В таком случае мы сможем свободно читать исходный код Spark и даже исправлять ошибки в нем (это нам не пригодилось). Кроме того, Scala принадлежит к семейству JVM-языков, что очень удобно при работе с файловой системой Hadoop напрямую через API, так как он написан на Java.

Ниже приведено сравнение языков программирования для Spark, где + и означают плюсы и минусы языка соответственно.


Scala:

+ функциональный, поэтому очень удобный для обработки данных любых объемов;

+ родной для Spark, это важно, если нужно понимать работу Spark изнутри;

+ основан на JVM, что делает его совместимым с Hadoop;

+ строгая типизация, тогда компилятор поможет найти часть ваших ошибок;

 труден в освоении, нам пришлось разработать свою программу обучения для новичков [19];

 сложный наем разработчики на Scala дороже, чем на Java или Python;

 сам язык не настолько распространен, как Java или Python.


Python:

+ популярный;

+ простой;

 динамическая типизация, ошибки, которые мог обнаружить компилятор;

 производительность хуже, чем Scala;

 нет чистой функциональности, как у Scala.


Java:

+ популярный;

+ родной для Hadoop;

+ строгая типизация;

 нефункционален (на самом деле после Java 8 все стало намного лучше).


Когда писалась вторая версия рекомендательного движка Retail Rocket, решение выбрать Scala основным языком программирования далось непросто, потому что этот язык никто не знал. Если бы мне пришлось выбирать сейчас, возможно, я бы посмотрел в сторону Java 8 и выше (версий), поскольку Java-разработчиков проще найти, чем тех, кто знает Scala.

Сейчас Spark идет в сторону дата-фреймов и дата-сетов (dataframe dataset), моду на которые сделала библиотека pandas [42] для Python самая популярная для анализа данных. На них проще писать, но есть один нюанс. Компилятор не может проверить корректность работы с внутренними переменными, что не очень хорошо для больших проектов.

Оптимизация скорости работы

Сейчас Spark идет в сторону дата-фреймов и дата-сетов (dataframe dataset), моду на которые сделала библиотека pandas [42] для Python самая популярная для анализа данных. На них проще писать, но есть один нюанс. Компилятор не может проверить корректность работы с внутренними переменными, что не очень хорошо для больших проектов.

Оптимизация скорости работы

Если пользователя не устраивает скорость ответа аналитической системы, значит, пора оптимизировать скорость работы хранилища. Решение может быть как простым, так и сложным. Первое, что необходимо понять,  можно ли обойтись малой кровью. В реляционных базах данных можно добавить индексы. В базах данных для этого имеются так называемые профайлеры, которые на основе запросов пользователей предложат, какие именно индексы добавить. В колоночной базе данных Clickhouse можно сменить схему партицирования или сделать сэмплирование, когда запрос будет использовать только часть данных. В Hadoop можно выделить больше ресурсов или даже оптимизировать программный код. Я практиковал оба способа.

Кроме этого, есть способ, работающий практически везде. Главный враг скорости запросов это соединение данных (join) в разных таблицах. Например, у нас есть две таблицы: клиенты и заказы, обе таблицы соединяются через id клиента, имеющийся в обеих таблицах. Почему бы не делать это соединение данных периодически, например по ночам, и сохранять результат обратно в хранилище? Это называется материализованным представлением (materialized view). Тогда клиенты будут обращаться не к данным напрямую, а к их представлению. Скорость будет на порядок выше это плюс. Минус такого решения усложнение всей конструкции. Если что-то пойдет не так и таблицы-доноры будут иметь неверные данные, эти представления придется пересчитать. Это нужно будет всегда держать в голове.

Более радикальное решение сменить технологию на ту, которая больше соответствует задачам. Я уже приводил пример, когда с Hadoop мы перевели пользователей на колоночную базу ClickHouse и получили ускорение в десятки и сотни раз.

Еще один дорогой способ ускорить работу аналитической системы сделать апгрейд «железа сервера». В системах Hadoop и Clickhouse это легко сделать, просто добавив дополнительные машины. Такая операция называется горизонтальным партицированием (sharding)  данные разбиваются по записям и распределяются по серверам. Запросы к данным будут выполняться параллельно на нескольких серверах, а затем результаты объединяются. Такая схема теоретически может дать линейное ускорение: два сервера будут работать быстрее в два раза по сравнению с одним сервером и т. д.

Архивация данных и устаревание

Следующая часто возникающая проблема с хранилищами данных большой рост данных, который приводит к нехватке свободного места. Этим нужно постоянно заниматься. Много данных не бывает, но бывает мало бюджета. Какие стратегии существуют?

Первая и самая простая стратегия удаление устаревших данных. Это могут быть данные старше двух лет, старше пяти лет все зависит от ваших задач. В Hadoop можно использовать другую стратегию: менять фактор репликации. По умолчанию он равен трем это означает, что для хранения одного терабайта данных вам понадобится три терабайта на дисках. Это цена надежности, в случае выхода из строя двух дата-нод одновременно вы не потеряете данные. Фактор репликации можно устанавливать индивидуально для файлов: мы можем его уменьшить для более старых. Например, данные младше двух лет фактор равен трем, от двух лет до четырех двум, старше четырех лет удаляются. Подобный подход используется в Facebook. Некоторые компании архивируют старые данные на какой-нибудь дешевый носитель, а если данные понадобятся, они переносятся обратно. Я не поддерживаю такую схему это как прятать вещи в чулан: потом о них забываешь, а если вспомнишь, то искать их там лень.

Второй способ использование кодеков сжатия (табл. 6.2) [43]. Это очень актуально и эффективно работает в Hadoop и Spark. Сжатие данных убивает двух зайцев мы уменьшаем объем занимаемого места на дисках и ускоряем работу с данными: они в разы быстрее гоняются по сети между серверами кластера и быстрее читаются с диска. Чудес не бывает чем сильнее жмет кодек, тем больше ему нужно ресурсов процессора для сжатия данных.

Мы на своем кластере используем кодеки: gzip, bzip2 и lzma. Lzma имеет самую высокую компрессию и используется для архивируемых данных. Gzip используется для всех остальных данных, поступающих в кластер. От конкретного кодека сжатия зависит возможность «разрезания» (split) файла для операции Map без его распаковки. Как уже писалось ранее, для операции Map данные «нарезаются» на блоки размером не больше заданного в настройках Hadoop (block size). Если сжатый файл больше этого размера, то в случае разделимого кодека (splittable codec) его можно разрезать и распаковать по частям на разных нодах кластера параллельно. В противном случае придется распаковывать этот огромный файл целиком а это уже будет гораздо медленнее.

Назад Дальше