Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- query = f"""
- -- забирает целиком историю тех чатов, в которых что-то происходило за последнее время
- -- проверяет поле user на совпадение с regex'ом, который опознает mobile_profile_id
- WITH current_data AS (SELECT message_id, chat_id, session_id, from, timestamp, type,
- if(match(user, '.+-.+-.+-.+-.+'), (NULL, user), (user, NULL)) AS mobile_cids_and_operator_names,
- connector_id, channel_id, card_type, card_id, json_data, reply_to, text AS text_table
- FROM RAW.RAW_MOS_AISUZ_chatMessages
- WHERE chat_id IN (SELECT DISTINCT chat_id
- FROM RAW.RAW_MOS_AISUZ_chatMessages
- WHERE toDate(timestamp) >= toDate('{start_query_exec_date}')
- AND toDate(timestamp) <= toDate('{execution_date}'))),
- -- раскладывает json_data на столбцы и джойнит с current_data,
- -- если значение переменной может содержаться в одном из нескольких полей, то делает coalesce
- -- делает toUInt8OrNull к полю с оценкой, потому что в этом поле по ошибке иногда попадаются строчки
- -- приравнивает к NULL значения 'Пользователь' в столбце user, после чего, помимо пропусков, там остаются только имена операторов
- wide_data AS (SELECT message_id, chat_id, session_id, from, timestamp, type, connector_id, channel_id, card_type,
- session_data_operator, user_agent, sso_id, reply_to, action, cardFrom AS card_from, crafttalk_dialog_id,
- nullif(mobile_cids_and_operator_names.1, 'Пользователь') AS current_operator,
- mobile_cids_and_operator_names.2 AS mobile_profile_id,
- coalesce(toString(card_id), cardId) AS card_id,
- coalesce(text_table, text_json) AS text,
- coalesce(client_id, clientId) AS client_id,
- coalesce(elk_person_id, elkPersonId) AS elk_person_id,
- coalesce(mos_id, mosId) AS mos_id,
- coalesce(url, nullif(JSONExtractString(citInfo, 'location'), '')) AS url_location,
- toUInt8OrNull(coalesce(toString(rate), RatingRequestGroup1, RatingRequestGroup2, RatingRequestGroup3,
- RatingRequestGroup4, RatingRequestGroup5)) as rate
- FROM (SELECT message_id,
- if(has(json_keys, 'action'), arrayElement(json_values, indexOf(json_keys, 'action')), NULL) AS action,
- if(has(json_keys, 'card-from'), arrayElement(json_values, indexOf(json_keys, 'card-from')), NULL) AS cardFrom,
- if(has(json_keys, 'client_id'), arrayElement(json_values, indexOf(json_keys, 'client_id')), NULL) AS client_id,
- if(has(json_keys, 'clientId'), arrayElement(json_values, indexOf(json_keys, 'clientId')), NULL) AS clientId,
- if(has(json_keys, 'card-id'), arrayElement(json_values, indexOf(json_keys, 'card-id')), NULL) AS cardId,
- if(has(json_keys, 'code'), arrayElement(json_values, indexOf(json_keys, 'code')), NULL) AS code,
- if(has(json_keys, 'crafttalk_operator_name'), arrayElement(json_values, indexOf(json_keys, 'crafttalk_operator_name')), NULL) AS session_data_operator,
- if(has(json_keys, 'crafttalk_dialog_id'), arrayElement(json_values, indexOf(json_keys, 'crafttalk_dialog_id')), NULL) AS crafttalk_dialog_id,
- if(has(json_keys, 'citInfo'), arrayElement(json_values, indexOf(json_keys, 'citInfo')), NULL) AS citInfo,
- if(has(json_keys, 'elk_person_id'), arrayElement(json_values, indexOf(json_keys, 'elk_person_id')), NULL) AS elk_person_id,
- if(has(json_keys, 'elkPersonId'), arrayElement(json_values, indexOf(json_keys, 'elkPersonId')), NULL) AS elkPersonId,
- if(has(json_keys, 'mos_id'), arrayElement(json_values, indexOf(json_keys, 'mos_id')), NULL) AS mos_id,
- if(has(json_keys, 'mosId'), arrayElement(json_values, indexOf(json_keys, 'mosId')), NULL) AS mosId,
- if(has(json_keys, 'rate'), arrayElement(json_values, indexOf(json_keys, 'rate')), NULL) AS rate,
- if(has(json_keys, 'RatingRequestGroup1'), '1', NULL) AS RatingRequestGroup1,
- if(has(json_keys, 'RatingRequestGroup2'), '2', NULL) AS RatingRequestGroup2,
- if(has(json_keys, 'RatingRequestGroup3'), '3', NULL) AS RatingRequestGroup3,
- if(has(json_keys, 'RatingRequestGroup4'), '4', NULL) AS RatingRequestGroup4,
- if(has(json_keys, 'RatingRequestGroup5'), '5', NULL) AS RatingRequestGroup5,
- if(has(json_keys, 'sso_id'), arrayElement(json_values, indexOf(json_keys, 'sso_id')), NULL) AS sso_id,
- if(has(json_keys, 'text'), arrayElement(json_values, indexOf(json_keys, 'text')), NULL) AS text_json,
- if(has(json_keys, 'url'), arrayElement(json_values, indexOf(json_keys, 'url')), NULL) AS url,
- if(has(json_keys, 'user_agent'), arrayElement(json_values, indexOf(json_keys, 'user_agent')), NULL) AS user_agent
- FROM (SELECT message_id,
- json_tuples,
- arrayMap(x -> x.1, json_tuples) AS json_keys,
- arrayMap(x -> x.2, json_tuples) AS json_values
- FROM (SELECT message_id,
- JSONExtractKeysAndValues(assumeNotNull(json_data), 'String') AS json_tuples
- FROM current_data
- WHERE json_data IS NOT NULL))) AS json
- RIGHT JOIN current_data USING message_id
- ORDER BY chat_id, timestamp ASC),
- -- удобно, сделав группировку по chat_id, заполнить пропуски в некоторых переменных методами ffill или bfill.
- -- допускаем, что в рамках одного чата значения этих переменных могут поменяться (даже когда этого не должно происходить - например, с оценкой).
- -- ниже вычисляются отступы: для функции lag - до ближайшего ненулевого значения, для функции lead - до первого ненулевого значения
- offsets AS (SELECT message_id, row_num,
- ifNull(row_num - max_current_operator_rownum_dropna, 0) AS current_operator_lag_offset,
- ifNull(min_current_operator_rownum_dropna - row_num, 0) AS current_operator_lead_offset,
- ifNull(row_num - max_session_data_operator_rownum_dropna, 0) AS session_data_operator_lag_offset,
- ifNull(min_session_data_operator_rownum_dropna - row_num, 0) AS session_data_operator_lead_offset,
- ifNull(row_num - max_connector_id_rownum_dropna, 0) AS connector_id_lag_offset,
- ifNull(min_connector_id_rownum_dropna - row_num, 0) AS connector_id_lead_offset,
- ifNull(row_num - max_channel_id_rownum_dropna, 0) AS channel_id_lag_offset,
- ifNull(min_channel_id_rownum_dropna - row_num, 0) AS channel_id_lead_offset,
- ifNull(row_num - max_crafttalk_dialog_id_rownum_dropna, 0) AS crafttalk_dialog_id_lag_offset,
- ifNull(min_crafttalk_dialog_id_rownum_dropna - row_num, 0) AS crafttalk_dialog_id_lead_offset,
- ifNull(row_num - max_url_location_rownum_dropna, 0) AS url_location_lag_offset,
- ifNull(min_url_location_rownum_dropna - row_num, 0) AS url_location_lead_offset,
- ifNull(row_num - max_client_id_rownum_dropna, 0) AS client_id_lag_offset,
- ifNull(min_client_id_rownum_dropna - row_num, 0) AS client_id_lead_offset,
- ifNull(row_num - max_mobile_profile_id_rownum_dropna, 0) AS mobile_profile_id_lag_offset,
- ifNull(min_mobile_profile_id_rownum_dropna - row_num, 0) AS mobile_profile_id_lead_offset,
- ifNull(row_num - max_elk_person_id_rownum_dropna, 0) AS elk_person_id_lag_offset,
- ifNull(min_elk_person_id_rownum_dropna - row_num, 0) AS elk_person_id_lead_offset,
- ifNull(row_num - max_mos_id_rownum_dropna, 0) AS mos_id_lag_offset,
- ifNull(min_mos_id_rownum_dropna - row_num, 0) AS mos_id_lead_offset,
- ifNull(row_num - max_sso_id_rownum_dropna, 0) AS sso_id_lag_offset,
- ifNull(min_sso_id_rownum_dropna - row_num, 0) AS sso_id_lead_offset,
- ifNull(row_num - max_user_agent_rownum_dropna, 0) AS user_agent_lag_offset,
- ifNull(min_user_agent_rownum_dropna - row_num, 0) AS user_agent_lead_offset,
- ifNull(row_num - max_rate_rownum_dropna, 0) AS rate_lag_offset,
- ifNull(min_rate_rownum_dropna - row_num, 0) AS rate_lead_offset
- FROM (SELECT message_id, row_num,
- -- В текущей версии КХ нельзя сделать (var1 - max(var2) OVER window), поэтому - отдельный подзапрос
- -- для оконной функции: 'https://github.com/ClickHouse/ClickHouse/issues/19857'
- max(current_operator_rownum_dropna) OVER chat_window_ordered AS max_current_operator_rownum_dropna,
- min(current_operator_rownum_dropna) OVER chat_window AS min_current_operator_rownum_dropna,
- max(session_data_operator_rownum_dropna) OVER chat_window_ordered AS max_session_data_operator_rownum_dropna,
- min(session_data_operator_rownum_dropna) OVER chat_window AS min_session_data_operator_rownum_dropna,
- max(connector_id_rownum_dropna) OVER chat_window_ordered AS max_connector_id_rownum_dropna,
- min(connector_id_rownum_dropna) OVER chat_window AS min_connector_id_rownum_dropna,
- max(channel_id_rownum_dropna) OVER chat_window_ordered AS max_channel_id_rownum_dropna,
- min(channel_id_rownum_dropna) OVER chat_window AS min_channel_id_rownum_dropna,
- max(crafttalk_dialog_id_rownum_dropna) OVER chat_window_ordered AS max_crafttalk_dialog_id_rownum_dropna,
- min(crafttalk_dialog_id_rownum_dropna) OVER chat_window AS min_crafttalk_dialog_id_rownum_dropna,
- max(url_location_rownum_dropna) OVER chat_window_ordered AS max_url_location_rownum_dropna,
- min(url_location_rownum_dropna) OVER chat_window AS min_url_location_rownum_dropna,
- max(client_id_rownum_dropna) OVER chat_window_ordered AS max_client_id_rownum_dropna,
- min(client_id_rownum_dropna) OVER chat_window AS min_client_id_rownum_dropna,
- max(mobile_profile_id_rownum_dropna) OVER chat_window_ordered AS max_mobile_profile_id_rownum_dropna,
- min(mobile_profile_id_rownum_dropna) OVER chat_window AS min_mobile_profile_id_rownum_dropna,
- max(elk_person_id_rownum_dropna) OVER chat_window_ordered AS max_elk_person_id_rownum_dropna,
- min(elk_person_id_rownum_dropna) OVER chat_window AS min_elk_person_id_rownum_dropna,
- max(mos_id_rownum_dropna) OVER chat_window_ordered AS max_mos_id_rownum_dropna,
- min(mos_id_rownum_dropna) OVER chat_window AS min_mos_id_rownum_dropna,
- max(sso_id_rownum_dropna) OVER chat_window_ordered AS max_sso_id_rownum_dropna,
- min(sso_id_rownum_dropna) OVER chat_window AS min_sso_id_rownum_dropna,
- max(user_agent_rownum_dropna) OVER chat_window_ordered AS max_user_agent_rownum_dropna,
- min(user_agent_rownum_dropna) OVER chat_window AS min_user_agent_rownum_dropna,
- max(rate_rownum_dropna) OVER chat_window_ordered AS max_rate_rownum_dropna,
- min(rate_rownum_dropna) OVER chat_window AS min_rate_rownum_dropna
- FROM (SELECT chat_id, message_id, timestamp, row_num,
- isNotNull(current_operator)? row_num : NULL AS current_operator_rownum_dropna,
- isNotNull(session_data_operator)? row_num : NULL AS session_data_operator_rownum_dropna,
- isNotNull(connector_id)? row_num : NULL AS connector_id_rownum_dropna,
- isNotNull(channel_id)? row_num : NULL AS channel_id_rownum_dropna,
- isNotNull(crafttalk_dialog_id)? row_num : NULL AS crafttalk_dialog_id_rownum_dropna,
- isNotNull(url_location)? row_num : NULL AS url_location_rownum_dropna,
- isNotNull(client_id)? row_num : NULL AS client_id_rownum_dropna,
- isNotNull(mobile_profile_id)? row_num : NULL AS mobile_profile_id_rownum_dropna,
- isNotNull(elk_person_id)? row_num : NULL AS elk_person_id_rownum_dropna,
- isNotNull(mos_id)? row_num : NULL AS mos_id_rownum_dropna,
- isNotNull(sso_id)? row_num : NULL AS sso_id_rownum_dropna,
- isNotNull(user_agent)? row_num : NULL AS user_agent_rownum_dropna,
- isNotNull(rate)? row_num : NULL AS rate_rownum_dropna
- FROM (SELECT message_id, current_operator, session_data_operator, connector_id, channel_id, url_location, client_id,
- mobile_profile_id, elk_person_id, mos_id, sso_id, user_agent, rate, timestamp, crafttalk_dialog_id,
- row_number() OVER (PARTITION BY chat_id ORDER BY timestamp) AS row_num, chat_id
- FROM wide_data))
- -- ORDER BY timestamp в окне может расставить строки в порядке, который будет отличаться от row_num, если это
- -- строки с одинаковым timestamp. Поэтому сейчас нужно делать ORDER BY row_num, чтобы воспроизвести порядок строчек из подзапроса
- WINDOW chat_window AS (PARTITION BY chat_id), chat_window_ordered AS (PARTITION BY chat_id ORDER BY row_num ASC)))
- -- основная часть запроса. multiIf применяется тогда, когда в рамках chat_id недостаточно заполнить так: (* * 10 * 15 *) --> (* * 10 10 15 15),
- -- и надо сделать bfill: --> (10 10 10 10 15 15) (звёздочки - это пропуски). если в рамках chat_id нет ни одного ненулевого значения, то во всех строчках остаётся пропуск
- -- INSERT INTO INTELLIGENCE.INTELLIGENCE_MOS_AISUZ_chatMessages
- SELECT message_id, chat_id, session_id, timestamp, open_time, close_time,
- ifNull(current_operator, lag_current_operator) AS current_operator,
- multiIf(session_data_operator_lead_offset > 0, lead_session_data_operator,
- isNotNull(session_data_operator), session_data_operator,
- lag_session_data_operator) AS session_data_operator,
- multiIf(connector_id_lead_offset > 0, lead_connector_id,
- isNotNull(connector_id), connector_id,
- lag_connector_id) AS connector_id,
- multiIf(channel_id_lead_offset > 0, lead_channel_id,
- isNotNull(channel_id), channel_id,
- lag_channel_id) AS channel_id,
- multiIf(url_location_lead_offset > 0, lead_url_location,
- isNotNull(url_location), url_location,
- lag_url_location) AS url_location,
- multiIf(rate_lead_offset > 0, lead_rate,
- isNotNull(rate), rate,
- lag_rate) AS rate,
- multiIf(crafttalk_dialog_id_lead_offset > 0, lead_crafttalk_dialog_id,
- isNotNull(crafttalk_dialog_id), crafttalk_dialog_id,
- lag_crafttalk_dialog_id) AS crafttalk_dialog_id,
- multiIf(client_id_lead_offset > 0, lead_client_id,
- isNotNull(client_id), client_id,
- lag_client_id) AS client_id,
- multiIf(mobile_profile_id_lead_offset > 0, lead_mobile_profile_id,
- isNotNull(mobile_profile_id), mobile_profile_id,
- lag_mobile_profile_id) AS mobile_profile_id,
- multiIf(elk_person_id_lead_offset > 0, lead_elk_person_id,
- isNotNull(elk_person_id), elk_person_id,
- lag_elk_person_id) AS elk_person_id,
- multiIf(mos_id_lead_offset > 0, lead_mos_id,
- isNotNull(mos_id), mos_id,
- lag_mos_id) AS mos_id,
- multiIf(sso_id_lead_offset > 0, lead_sso_id,
- isNotNull(sso_id), sso_id,
- lag_sso_id) AS sso_id,
- multiIf(user_agent_lead_offset > 0, lead_user_agent,
- isNotNull(user_agent), user_agent,
- lag_user_agent) AS user_agent,
- from, type, action, reply_to, card_id, card_type, card_from, text
- FROM (SELECT message_id, chat_id, session_id, timestamp, current_operator, from, type, action, reply_to, card_id, card_type, card_from, text,
- session_data_operator, session_data_operator_lead_offset, connector_id, connector_id_lead_offset, channel_id, channel_id_lead_offset,
- url_location, url_location_lead_offset, rate, rate_lead_offset, crafttalk_dialog_id, crafttalk_dialog_id_lead_offset, client_id,
- client_id_lead_offset, mobile_profile_id, mobile_profile_id_lead_offset, elk_person_id, elk_person_id_lead_offset, mos_id, mos_id_lead_offset,
- sso_id, sso_id_lead_offset, user_agent, user_agent_lead_offset,
- min(timestamp) OVER chat_window_ordered AS open_time,
- max(timestamp) OVER chat_window_ordered AS close_time,
- lagInFrame(current_operator, current_operator_lag_offset) OVER chat_window_ordered AS lag_current_operator,
- leadInFrame(session_data_operator, if(session_data_operator_lead_offset < 0, 0, session_data_operator_lead_offset)) OVER chat_window_ordered AS lead_session_data_operator,
- lagInFrame(session_data_operator, session_data_operator_lag_offset) OVER chat_window_ordered AS lag_session_data_operator,
- leadInFrame(connector_id, if(connector_id_lead_offset < 0, 0, connector_id_lead_offset)) OVER chat_window_ordered AS lead_connector_id,
- lagInFrame(connector_id, connector_id_lag_offset) OVER chat_window_ordered AS lag_connector_id,
- leadInFrame(channel_id, if(channel_id_lead_offset < 0, 0, channel_id_lead_offset)) OVER chat_window_ordered AS lead_channel_id,
- lagInFrame(channel_id, channel_id_lag_offset) OVER chat_window_ordered AS lag_channel_id,
- leadInFrame(url_location, if(url_location_lead_offset < 0, 0, url_location_lead_offset)) OVER chat_window_ordered AS lead_url_location,
- lagInFrame(url_location, url_location_lag_offset) OVER chat_window_ordered AS lag_url_location,
- leadInFrame(rate, if(rate_lead_offset < 0, 0, rate_lead_offset)) OVER chat_window_ordered AS lead_rate,
- lagInFrame(rate, rate_lag_offset) OVER chat_window_ordered AS lag_rate,
- leadInFrame(crafttalk_dialog_id, if(crafttalk_dialog_id_lead_offset < 0, 0, crafttalk_dialog_id_lead_offset)) OVER chat_window_ordered AS lead_crafttalk_dialog_id,
- lagInFrame(crafttalk_dialog_id, crafttalk_dialog_id_lag_offset) OVER chat_window_ordered AS lag_crafttalk_dialog_id,
- leadInFrame(client_id, if(client_id_lead_offset < 0, 0, client_id_lead_offset)) OVER chat_window_ordered AS lead_client_id,
- lagInFrame(client_id, client_id_lag_offset) OVER chat_window_ordered AS lag_client_id,
- leadInFrame(mobile_profile_id, if(mobile_profile_id_lead_offset < 0, 0, mobile_profile_id_lead_offset)) OVER chat_window_ordered AS lead_mobile_profile_id,
- lagInFrame(mobile_profile_id, mobile_profile_id_lag_offset) OVER chat_window_ordered AS lag_mobile_profile_id,
- leadInFrame(elk_person_id, if(elk_person_id_lead_offset < 0, 0, elk_person_id_lead_offset)) OVER chat_window_ordered AS lead_elk_person_id,
- lagInFrame(elk_person_id, elk_person_id_lag_offset) OVER chat_window_ordered AS lag_elk_person_id,
- leadInFrame(mos_id, if(mos_id_lead_offset < 0, 0, mos_id_lead_offset)) OVER chat_window_ordered AS lead_mos_id,
- lagInFrame(mos_id, mos_id_lag_offset) OVER chat_window_ordered AS lag_mos_id,
- leadInFrame(sso_id, if(sso_id_lead_offset < 0, 0, sso_id_lead_offset)) OVER chat_window_ordered AS lead_sso_id,
- lagInFrame(sso_id, sso_id_lag_offset) OVER chat_window_ordered AS lag_sso_id,
- leadInFrame(user_agent, if(user_agent_lead_offset < 0, 0, user_agent_lead_offset)) OVER chat_window_ordered AS lead_user_agent,
- lagInFrame(user_agent, user_agent_lag_offset) OVER chat_window_ordered AS lag_user_agent
- FROM wide_data JOIN offsets USING message_id
- WINDOW chat_window_ordered AS (PARTITION BY chat_id ORDER BY row_num ASC))
- """
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement