Анализ отзывов с banki.ru

Данный материал представлен в информационно-ознакомительных целях.
Список частей:
-
1
часть: Сбор данных ссылка -
2
часть: Визуализация ссылка -
3
часть: Мониторинг загрузок ссылка -
4
часть: Создание сайта и RESTAPI ссылка
Короткое вступление
Третья часть в серии статей о сборе данных с ресурса banki.ru - отзывов клиентов
Прошлую статью мы закончили на визуализации данных в Grafana
, что мы имеем на данный момент?
- Код, который парсит сайт
- Код, который сохраняет данных в бд
- Код, который визуализирует данные из бд
Не хватает блока, в котором будут мониторится процессы загрузки данных (Парсинга)
Решение проблемы
Предлагаю реализовать некий аудит загрузок, используя 2 таблицы:
-
audit_feed
- основная -
audit_feed_type
- вспомогательная

create table if not exists home.audit_feed ( id integer primary key, start_date timestamptz not null, name varchar(255) not null, finish_date timestamptz null, status varchar(50) not null );
create table if not exists home.audit_feed_type ( id serial primary key, feed_id integer not null, start_date timestamptz not null, url varchar(510) not null, finish_date timestamptz null, status varchar(50) not null, FOREIGN KEY (feed_id) REFERENCES home.audit_feed (id) );
Также добавим последовальность SEQUENCE
, с помощью которой будет проставляться связь между таблицами по время инсерта новых данных
CREATE SEQUENCE IF NOT exists audit_sequence START 1;
Подготовка функций на python
Таблицы и последовательность есть, теперь необходимо обернуть логику их заполнения в python
код
D = Union[datetime, None]
Получаем id
последовательности
def get_sequence_id(): cursor.execute("SELECT nextval('audit_sequence')") sequence_id = cursor.fetchone()[0] return sequence_id
Добавление строки в основную таблицу
def create_flow_audit( sequence_id: int, start_date: datetime, name: str, finish_date: D, status: str ): cursor.execute(f''' insert into {AUDIT_TABLE} (id, start_date, name, finish_date, status) values (%s, %s, %s, %s, %s) ''', (sequence_id, start_date, name, finish_date, status)) conn.commit()
Обновление строки в основной таблице
def update_flow_audit( sequence_id: int, finish_date: datetime, status: str ): cursor.execute(f''' update {AUDIT_TABLE} set finish_date = '{finish_date}', status = '{status}' where id = {sequence_id} ''') conn.commit()
Добавление строки во вспомогательную таблицу
def create_flow_audit_type( feed_id: int, start_date: datetime, url: str, finish_date: D, status: str ): cursor.execute(f''' insert into {AUDIT_TYPE_TABLE} (feed_id, start_date, url, finish_date, status) values (%s, %s, %s, %s, %s) ''', (feed_id, start_date, url, finish_date, status)) conn.commit()
Обновление строки во вспомогательной таблице
def update_flow_audit_type( feed_id: int, finish_date: D, status: str ): cursor.execute(f''' update {AUDIT_TYPE_TABLE} set finish_date = '{finish_date}', status = '{status}' where feed_id = {feed_id} and finish_date is null ''') conn.commit()
Обёртка sql-логики
готова, осталось внести ее в основной код
Выглядеть это будет следующим образом:
def main(): ... # come code sequence_id = get_sequence_id() create_flow_audit(sequence_id, get_datetime(), FLOW_NAME.format(args.name), None, RUNNING) status = SUCCESS for page in range(1, PAGES): url = URL.format(args.name, page) try: create_flow_audit_type(sequence_id, get_datetime(), url, None, RUNNING) ... # come code update_flow_audit_type(sequence_id, get_datetime(), SUCCESS) except (... # come code) as e: update_flow_audit_type(sequence_id, get_datetime(), FAILED) ... # come code status = FAILED update_flow_audit(sequence_id, get_datetime(), status) ... # come code
Проверка данных
select * from home.audit_feed order by id desc limit 10;

select * from home.audit_feed_type where feed_id = 846 order by id desc limit 10;

Вариант развернутого просмотра
select f.id , f.
start_date , f.name , f.finish_date , f.status , ft.start_date as process_st_date , ft.finish_date as process_end_date , ft.status as process_status , ft.url from home.audit_feed f left join home.audit_feed_type ft on f.id = ft.feed_id where f.id = 846 order by f.id desc limit 10;

Визуализация мониторинга процессов
Сформируем sql
отчет по статусу запусков
select AF.name , AF.start_date , AF.finish_date , count(*) filter(where FT.status = 'RUNNING') as running_cnt , count(*) filter(where FT.status = 'SUCCESS') as success_cnt , count(*) filter(where FT.status = 'FAILED') as failed_cnt , round((extract(epoch from (AF.finish_date - AF.start_date))/60)::numeric, 1) as run_time , AF.status from home.audit_feed AF inner join home.audit_feed_type FT on AF.id = FT.feed_id where 1=1 and AF.id in ( select max(id) from home.audit_feed group by name ) group by AF.id , AF.start_date , AF.name , AF.finish_date , AF.status;

Результат в Grafana

На этом шаге мониторинг загрузок можно считать завершенным (учитывая, что в коде присутствует достаточно логгирования), при желании можно добавить некоторые оповещения (алерты) в самой графане.