На информационном ресурсе применяются рекомендательные технологии (информационные технологии предоставления информации на основе сбора, систематизации и анализа сведений, относящихся к предпочтениям пользователей сети "Интернет", находящихся на территории Российской Федерации)

artydev & Co

1 подписчик

Анализ отзывов с banki.ru [Часть 3] Аудит загрузок

Анализ отзывов с banki.ru

img

Данный материал представлен в информационно-ознакомительных целях.

Список частей:

Короткое вступление

Третья часть в серии статей о сборе данных с ресурса banki.ru - отзывов клиентов

Прошлую статью мы закончили на визуализации данных в Grafana, что мы имеем на данный момент?

  1. Код, который парсит сайт
  2. Код, который сохраняет данных в бд
  3. Код, который визуализирует данные из бд

Не хватает блока, в котором будут мониторится процессы загрузки данных (Парсинга)

Решение проблемы

Предлагаю реализовать некий аудит загрузок, используя 2 таблицы:

  • audit_feed - основная
  • audit_feed_type - вспомогательная
img
create table if not exists home.audit_feed ( 	id integer primary key, 	start_date timestamptz not null, 	name varchar(255) not null, 	finish_date timestamptz null, 	status varchar(50) not null ); 
create table if not exists home.audit_feed_type ( 	id serial primary key, 	feed_id integer not null, 	start_date timestamptz not null, 	url varchar(510) not null, 	finish_date timestamptz null, 	status varchar(50) not null, 	FOREIGN KEY (feed_id) REFERENCES home.audit_feed (id) ); 

Также добавим последовальность SEQUENCE, с помощью которой будет проставляться связь между таблицами по время инсерта новых данных

CREATE SEQUENCE IF NOT exists audit_sequence START 1; 

Подготовка функций на python

Таблицы и последовательность есть, теперь необходимо обернуть логику их заполнения в python код

D = Union[datetime, None] 

Получаем id последовательности

def get_sequence_id():     cursor.execute("SELECT nextval('audit_sequence')")     sequence_id = cursor.fetchone()[0]     return sequence_id 

Добавление строки в основную таблицу

def create_flow_audit(         sequence_id: int,         start_date: datetime,         name: str,         finish_date: D,         status: str ):     cursor.execute(f'''     insert into {AUDIT_TABLE} (id, start_date, name, finish_date, status) values (%s, %s, %s, %s, %s)     ''', (sequence_id, start_date, name, finish_date, status))     conn.commit() 

Обновление строки в основной таблице

def update_flow_audit(         sequence_id: int,         finish_date: datetime,         status: str ):     cursor.execute(f'''     update {AUDIT_TABLE} set finish_date = '{finish_date}', status = '{status}' where id = {sequence_id}     ''')     conn.commit() 

Добавление строки во вспомогательную таблицу

def create_flow_audit_type(         feed_id: int,         start_date: datetime,         url: str,         finish_date: D,         status: str ):     cursor.execute(f'''     insert into {AUDIT_TYPE_TABLE} (feed_id, start_date, url, finish_date, status) values (%s, %s, %s, %s, %s)     ''', (feed_id, start_date, url, finish_date, status))     conn.commit() 

Обновление строки во вспомогательной таблице

def update_flow_audit_type(         feed_id: int,         finish_date: D,         status: str ):     cursor.execute(f'''     update {AUDIT_TYPE_TABLE} set finish_date = '{finish_date}', status = '{status}' where feed_id = {feed_id}     and finish_date is null     ''')     conn.commit() 

Обёртка sql-логики готова, осталось внести ее в основной код

Выглядеть это будет следующим образом:

def main():     ... # come code     sequence_id = get_sequence_id()     create_flow_audit(sequence_id, get_datetime(), FLOW_NAME.format(args.name), None, RUNNING)     status = SUCCESS     for page in range(1, PAGES):         url = URL.format(args.name, page)         try:             create_flow_audit_type(sequence_id, get_datetime(), url, None, RUNNING)             ... # come code             update_flow_audit_type(sequence_id, get_datetime(), SUCCESS)         except (... # come code) as e:             update_flow_audit_type(sequence_id, get_datetime(), FAILED)             ... # come code             status = FAILED     update_flow_audit(sequence_id, get_datetime(), status)     ... # come code 

Проверка данных

select * from home.audit_feed order by id desc limit 10; 
img
select * from home.audit_feed_type where feed_id = 846 order by id desc limit 10; 
img

Вариант развернутого просмотра

select  	f.id 	, f.
start_date , f.name , f.finish_date , f.status , ft.start_date as process_st_date , ft.finish_date as process_end_date , ft.status as process_status , ft.url from home.audit_feed f left join home.audit_feed_type ft on f.id = ft.feed_id where f.id = 846 order by f.id desc limit 10;
img

Визуализация мониторинга процессов

Сформируем sql отчет по статусу запусков

select  	AF.name 	, AF.start_date 	, AF.finish_date 	, count(*) filter(where FT.status = 'RUNNING') as running_cnt 	, count(*) filter(where FT.status = 'SUCCESS') as success_cnt 	, count(*) filter(where FT.status = 'FAILED') as failed_cnt 	, round((extract(epoch from (AF.finish_date - AF.start_date))/60)::numeric, 1) as run_time 	, AF.status from  	home.audit_feed AF 	inner join home.audit_feed_type FT 		on AF.id = FT.feed_id  where  	1=1 	and AF.id in ( 		select max(id) from home.audit_feed group by name 	) group by  	AF.id 	, AF.start_date 	, AF.name 	, AF.finish_date 	, AF.status; 
img

Результат в Grafana

img

На этом шаге мониторинг загрузок можно считать завершенным (учитывая, что в коде присутствует достаточно логгирования), при желании можно добавить некоторые оповещения (алерты) в самой графане.

Ссылка на первоисточник
наверх