Advertisement
Artemii_Kravtsov

Untitled

Oct 7th, 2021
1,049
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. query = f"""
  2. -- забирает целиком историю тех чатов, в которых что-то происходило за последнее время
  3. -- проверяет поле user на совпадение с regex'ом, который опознает mobile_profile_id
  4.  WITH current_data AS (SELECT message_id, chat_id, session_id, from, timestamp, type,  
  5.                               if(match(user, '.+-.+-.+-.+-.+'), (NULL, user), (user, NULL)) AS mobile_cids_and_operator_names,
  6.                               connector_id, channel_id, card_type, card_id, json_data, reply_to, text AS text_table
  7.                          FROM RAW.RAW_MOS_AISUZ_chatMessages
  8.                         WHERE chat_id IN (SELECT DISTINCT chat_id
  9.                                             FROM RAW.RAW_MOS_AISUZ_chatMessages
  10.                                            WHERE toDate(timestamp) >= toDate('{start_query_exec_date}')
  11.                                              AND toDate(timestamp) <= toDate('{execution_date}'))),
  12.                                              
  13. -- раскладывает json_data на столбцы и джойнит с current_data,
  14. -- если значение переменной может содержаться в одном из нескольких полей, то делает coalesce
  15. -- делает toUInt8OrNull к полю с оценкой, потому что в этом поле по ошибке иногда попадаются строчки
  16. -- приравнивает к NULL значения 'Пользователь' в столбце user, после чего, помимо пропусков, там остаются только имена операторов
  17.          wide_data AS (SELECT message_id, chat_id, session_id, from, timestamp, type, connector_id, channel_id, card_type,
  18.                               session_data_operator, user_agent, sso_id, reply_to, action, cardFrom AS card_from, crafttalk_dialog_id,
  19.                               nullif(mobile_cids_and_operator_names.1, 'Пользователь') AS current_operator,
  20.                               mobile_cids_and_operator_names.2 AS mobile_profile_id,
  21.                               coalesce(toString(card_id), cardId) AS card_id,
  22.                               coalesce(text_table, text_json) AS text,
  23.                               coalesce(client_id, clientId) AS client_id,  
  24.                               coalesce(elk_person_id, elkPersonId) AS elk_person_id,
  25.                               coalesce(mos_id, mosId) AS mos_id,
  26.                               coalesce(url, nullif(JSONExtractString(citInfo, 'location'), '')) AS url_location,
  27.                               toUInt8OrNull(coalesce(toString(rate), RatingRequestGroup1, RatingRequestGroup2, RatingRequestGroup3,
  28.                                                      RatingRequestGroup4, RatingRequestGroup5)) as rate
  29.                          FROM (SELECT message_id,
  30.                                       if(has(json_keys, 'action'), arrayElement(json_values, indexOf(json_keys, 'action')), NULL) AS action,
  31.                                       if(has(json_keys, 'card-from'), arrayElement(json_values, indexOf(json_keys, 'card-from')), NULL) AS cardFrom,
  32.                                       if(has(json_keys, 'client_id'), arrayElement(json_values, indexOf(json_keys, 'client_id')), NULL) AS client_id,
  33.                                       if(has(json_keys, 'clientId'), arrayElement(json_values, indexOf(json_keys, 'clientId')), NULL) AS clientId,
  34.                                       if(has(json_keys, 'card-id'), arrayElement(json_values, indexOf(json_keys, 'card-id')), NULL) AS cardId,
  35.                                       if(has(json_keys, 'code'), arrayElement(json_values, indexOf(json_keys, 'code')), NULL) AS code,
  36.                                       if(has(json_keys, 'crafttalk_operator_name'), arrayElement(json_values, indexOf(json_keys, 'crafttalk_operator_name')), NULL) AS session_data_operator,
  37.                                       if(has(json_keys, 'crafttalk_dialog_id'), arrayElement(json_values, indexOf(json_keys, 'crafttalk_dialog_id')), NULL) AS crafttalk_dialog_id,
  38.                                       if(has(json_keys, 'citInfo'), arrayElement(json_values, indexOf(json_keys, 'citInfo')), NULL) AS citInfo,
  39.                                       if(has(json_keys, 'elk_person_id'), arrayElement(json_values, indexOf(json_keys, 'elk_person_id')), NULL) AS elk_person_id,
  40.                                       if(has(json_keys, 'elkPersonId'), arrayElement(json_values, indexOf(json_keys, 'elkPersonId')), NULL) AS elkPersonId,
  41.                                       if(has(json_keys, 'mos_id'), arrayElement(json_values, indexOf(json_keys, 'mos_id')), NULL) AS mos_id,
  42.                                       if(has(json_keys, 'mosId'), arrayElement(json_values, indexOf(json_keys, 'mosId')), NULL) AS mosId,
  43.                                       if(has(json_keys, 'rate'), arrayElement(json_values, indexOf(json_keys, 'rate')), NULL) AS rate,
  44.                                       if(has(json_keys, 'RatingRequestGroup1'), '1', NULL) AS RatingRequestGroup1,
  45.                                       if(has(json_keys, 'RatingRequestGroup2'), '2', NULL) AS RatingRequestGroup2,
  46.                                       if(has(json_keys, 'RatingRequestGroup3'), '3', NULL) AS RatingRequestGroup3,
  47.                                       if(has(json_keys, 'RatingRequestGroup4'), '4', NULL) AS RatingRequestGroup4,
  48.                                       if(has(json_keys, 'RatingRequestGroup5'), '5', NULL) AS RatingRequestGroup5,
  49.                                       if(has(json_keys, 'sso_id'), arrayElement(json_values, indexOf(json_keys, 'sso_id')), NULL) AS sso_id,
  50.                                       if(has(json_keys, 'text'), arrayElement(json_values, indexOf(json_keys, 'text')), NULL) AS text_json,
  51.                                       if(has(json_keys, 'url'), arrayElement(json_values, indexOf(json_keys, 'url')), NULL) AS url,
  52.                                       if(has(json_keys, 'user_agent'), arrayElement(json_values, indexOf(json_keys, 'user_agent')), NULL) AS user_agent
  53.                                  FROM (SELECT message_id,
  54.                                               json_tuples,
  55.                                               arrayMap(x -> x.1, json_tuples) AS json_keys,
  56.                                               arrayMap(x -> x.2, json_tuples) AS json_values
  57.                                          FROM (SELECT message_id,
  58.                                                       JSONExtractKeysAndValues(assumeNotNull(json_data), 'String') AS json_tuples
  59.                                                  FROM current_data
  60.                                                 WHERE json_data IS NOT NULL))) AS json
  61.                    RIGHT JOIN current_data USING message_id
  62.                      ORDER BY chat_id, timestamp ASC),
  63.                    
  64. -- удобно, сделав группировку по chat_id, заполнить пропуски в некоторых переменных методами ffill или bfill.
  65. -- допускаем, что в рамках одного чата значения этих переменных могут поменяться (даже когда этого не должно происходить - например, с оценкой).
  66. -- ниже вычисляются отступы: для функции lag - до ближайшего ненулевого значения, для функции lead - до первого ненулевого значения
  67.            offsets AS (SELECT message_id, row_num,
  68.                               ifNull(row_num - max_current_operator_rownum_dropna, 0) AS current_operator_lag_offset,
  69.                               ifNull(min_current_operator_rownum_dropna - row_num, 0) AS current_operator_lead_offset,        
  70.                               ifNull(row_num - max_session_data_operator_rownum_dropna, 0) AS session_data_operator_lag_offset,
  71.                               ifNull(min_session_data_operator_rownum_dropna - row_num, 0) AS session_data_operator_lead_offset,        
  72.                               ifNull(row_num - max_connector_id_rownum_dropna, 0) AS connector_id_lag_offset,
  73.                               ifNull(min_connector_id_rownum_dropna - row_num, 0) AS connector_id_lead_offset,        
  74.                               ifNull(row_num - max_channel_id_rownum_dropna, 0) AS channel_id_lag_offset,
  75.                               ifNull(min_channel_id_rownum_dropna - row_num, 0) AS channel_id_lead_offset,                              
  76.                               ifNull(row_num - max_crafttalk_dialog_id_rownum_dropna, 0) AS crafttalk_dialog_id_lag_offset,
  77.                               ifNull(min_crafttalk_dialog_id_rownum_dropna - row_num, 0) AS crafttalk_dialog_id_lead_offset,
  78.                               ifNull(row_num - max_url_location_rownum_dropna, 0) AS url_location_lag_offset,
  79.                               ifNull(min_url_location_rownum_dropna - row_num, 0) AS url_location_lead_offset,        
  80.                               ifNull(row_num - max_client_id_rownum_dropna, 0) AS client_id_lag_offset,
  81.                               ifNull(min_client_id_rownum_dropna - row_num, 0) AS client_id_lead_offset,        
  82.                               ifNull(row_num - max_mobile_profile_id_rownum_dropna, 0) AS mobile_profile_id_lag_offset,
  83.                               ifNull(min_mobile_profile_id_rownum_dropna - row_num, 0) AS mobile_profile_id_lead_offset,        
  84.                               ifNull(row_num - max_elk_person_id_rownum_dropna, 0) AS elk_person_id_lag_offset,
  85.                               ifNull(min_elk_person_id_rownum_dropna - row_num, 0) AS elk_person_id_lead_offset,        
  86.                               ifNull(row_num - max_mos_id_rownum_dropna, 0) AS mos_id_lag_offset,
  87.                               ifNull(min_mos_id_rownum_dropna - row_num, 0) AS mos_id_lead_offset,        
  88.                               ifNull(row_num - max_sso_id_rownum_dropna, 0) AS sso_id_lag_offset,
  89.                               ifNull(min_sso_id_rownum_dropna - row_num, 0) AS sso_id_lead_offset,        
  90.                               ifNull(row_num - max_user_agent_rownum_dropna, 0) AS user_agent_lag_offset,
  91.                               ifNull(min_user_agent_rownum_dropna - row_num, 0) AS user_agent_lead_offset,        
  92.                               ifNull(row_num - max_rate_rownum_dropna, 0) AS rate_lag_offset,
  93.                               ifNull(min_rate_rownum_dropna - row_num, 0) AS rate_lead_offset      
  94.                         FROM  (SELECT message_id, row_num,
  95.                                       -- В текущей версии КХ нельзя сделать (var1 - max(var2) OVER window), поэтому - отдельный подзапрос
  96.                                       -- для оконной функции: 'https://github.com/ClickHouse/ClickHouse/issues/19857'
  97.                                       max(current_operator_rownum_dropna) OVER chat_window_ordered AS max_current_operator_rownum_dropna,
  98.                                       min(current_operator_rownum_dropna) OVER chat_window AS min_current_operator_rownum_dropna,        
  99.                                       max(session_data_operator_rownum_dropna) OVER chat_window_ordered AS max_session_data_operator_rownum_dropna,
  100.                                       min(session_data_operator_rownum_dropna) OVER chat_window AS min_session_data_operator_rownum_dropna,        
  101.                                       max(connector_id_rownum_dropna) OVER chat_window_ordered AS max_connector_id_rownum_dropna,
  102.                                       min(connector_id_rownum_dropna) OVER chat_window AS min_connector_id_rownum_dropna,        
  103.                                       max(channel_id_rownum_dropna) OVER chat_window_ordered AS max_channel_id_rownum_dropna,
  104.                                       min(channel_id_rownum_dropna) OVER chat_window AS min_channel_id_rownum_dropna,                              
  105.                                       max(crafttalk_dialog_id_rownum_dropna) OVER chat_window_ordered AS max_crafttalk_dialog_id_rownum_dropna,
  106.                                       min(crafttalk_dialog_id_rownum_dropna) OVER chat_window AS min_crafttalk_dialog_id_rownum_dropna,
  107.                                       max(url_location_rownum_dropna) OVER chat_window_ordered AS max_url_location_rownum_dropna,
  108.                                       min(url_location_rownum_dropna) OVER chat_window AS min_url_location_rownum_dropna,        
  109.                                       max(client_id_rownum_dropna) OVER chat_window_ordered AS max_client_id_rownum_dropna,
  110.                                       min(client_id_rownum_dropna) OVER chat_window AS min_client_id_rownum_dropna,        
  111.                                       max(mobile_profile_id_rownum_dropna) OVER chat_window_ordered AS max_mobile_profile_id_rownum_dropna,
  112.                                       min(mobile_profile_id_rownum_dropna) OVER chat_window AS min_mobile_profile_id_rownum_dropna,        
  113.                                       max(elk_person_id_rownum_dropna) OVER chat_window_ordered AS max_elk_person_id_rownum_dropna,
  114.                                       min(elk_person_id_rownum_dropna) OVER chat_window AS min_elk_person_id_rownum_dropna,        
  115.                                       max(mos_id_rownum_dropna) OVER chat_window_ordered AS max_mos_id_rownum_dropna,
  116.                                       min(mos_id_rownum_dropna) OVER chat_window AS min_mos_id_rownum_dropna,        
  117.                                       max(sso_id_rownum_dropna) OVER chat_window_ordered AS max_sso_id_rownum_dropna,
  118.                                       min(sso_id_rownum_dropna) OVER chat_window AS min_sso_id_rownum_dropna,        
  119.                                       max(user_agent_rownum_dropna) OVER chat_window_ordered AS max_user_agent_rownum_dropna,
  120.                                       min(user_agent_rownum_dropna) OVER chat_window AS min_user_agent_rownum_dropna,        
  121.                                       max(rate_rownum_dropna) OVER chat_window_ordered AS max_rate_rownum_dropna,
  122.                                       min(rate_rownum_dropna) OVER chat_window AS min_rate_rownum_dropna      
  123.                                  FROM (SELECT chat_id, message_id, timestamp, row_num,
  124.                                               isNotNull(current_operator)? row_num : NULL AS current_operator_rownum_dropna,
  125.                                               isNotNull(session_data_operator)? row_num : NULL AS session_data_operator_rownum_dropna,
  126.                                               isNotNull(connector_id)? row_num : NULL AS connector_id_rownum_dropna,
  127.                                               isNotNull(channel_id)? row_num : NULL AS channel_id_rownum_dropna,
  128.                                               isNotNull(crafttalk_dialog_id)? row_num : NULL AS crafttalk_dialog_id_rownum_dropna,
  129.                                               isNotNull(url_location)? row_num : NULL AS url_location_rownum_dropna,
  130.                                               isNotNull(client_id)? row_num : NULL AS client_id_rownum_dropna,
  131.                                               isNotNull(mobile_profile_id)? row_num : NULL AS mobile_profile_id_rownum_dropna,
  132.                                               isNotNull(elk_person_id)? row_num : NULL AS elk_person_id_rownum_dropna,
  133.                                               isNotNull(mos_id)? row_num : NULL AS mos_id_rownum_dropna,
  134.                                               isNotNull(sso_id)? row_num : NULL AS sso_id_rownum_dropna,
  135.                                               isNotNull(user_agent)? row_num : NULL AS user_agent_rownum_dropna,
  136.                                               isNotNull(rate)? row_num : NULL AS rate_rownum_dropna              
  137.                                          FROM (SELECT message_id, current_operator, session_data_operator, connector_id, channel_id, url_location, client_id,
  138.                                                       mobile_profile_id, elk_person_id, mos_id, sso_id, user_agent, rate, timestamp, crafttalk_dialog_id,
  139.                                                       row_number() OVER (PARTITION BY chat_id ORDER BY timestamp) AS row_num, chat_id
  140.                                                  FROM wide_data))
  141.                                -- ORDER BY timestamp в окне может расставить строки в порядке, который будет отличаться от row_num, если это
  142.                                -- строки с одинаковым timestamp. Поэтому сейчас нужно делать ORDER BY row_num, чтобы воспроизвести порядок строчек из подзапроса
  143.                                WINDOW chat_window AS (PARTITION BY chat_id), chat_window_ordered AS (PARTITION BY chat_id ORDER BY row_num ASC)))
  144.  
  145. -- основная часть запроса. multiIf применяется тогда, когда в рамках chat_id недостаточно заполнить так: (* * 10 * 15 *) --> (* * 10 10 15 15),
  146. -- и надо сделать bfill: --> (10 10 10 10 15 15) (звёздочки - это пропуски). если в рамках chat_id нет ни одного ненулевого значения, то во всех строчках остаётся пропуск
  147. -- INSERT INTO INTELLIGENCE.INTELLIGENCE_MOS_AISUZ_chatMessages
  148.     SELECT message_id, chat_id, session_id, timestamp, open_time, close_time,
  149.            ifNull(current_operator, lag_current_operator) AS current_operator,
  150.            multiIf(session_data_operator_lead_offset > 0, lead_session_data_operator,
  151.                    isNotNull(session_data_operator), session_data_operator,
  152.                    lag_session_data_operator) AS session_data_operator,
  153.            multiIf(connector_id_lead_offset > 0, lead_connector_id,
  154.                    isNotNull(connector_id), connector_id,
  155.                    lag_connector_id) AS connector_id,
  156.            multiIf(channel_id_lead_offset > 0, lead_channel_id,
  157.                    isNotNull(channel_id), channel_id,
  158.                    lag_channel_id) AS channel_id,
  159.            multiIf(url_location_lead_offset > 0, lead_url_location,
  160.                    isNotNull(url_location), url_location,
  161.                    lag_url_location) AS url_location,
  162.            multiIf(rate_lead_offset > 0, lead_rate,
  163.                    isNotNull(rate), rate,
  164.                    lag_rate) AS rate,
  165.            multiIf(crafttalk_dialog_id_lead_offset > 0, lead_crafttalk_dialog_id,
  166.                    isNotNull(crafttalk_dialog_id), crafttalk_dialog_id,
  167.                    lag_crafttalk_dialog_id) AS crafttalk_dialog_id,                    
  168.            multiIf(client_id_lead_offset > 0, lead_client_id,
  169.                    isNotNull(client_id), client_id,
  170.                    lag_client_id) AS client_id,
  171.            multiIf(mobile_profile_id_lead_offset > 0, lead_mobile_profile_id,
  172.                    isNotNull(mobile_profile_id), mobile_profile_id,
  173.                    lag_mobile_profile_id) AS mobile_profile_id,
  174.            multiIf(elk_person_id_lead_offset > 0, lead_elk_person_id,
  175.                    isNotNull(elk_person_id), elk_person_id,
  176.                    lag_elk_person_id) AS elk_person_id,
  177.            multiIf(mos_id_lead_offset > 0, lead_mos_id,
  178.                    isNotNull(mos_id), mos_id,
  179.                    lag_mos_id) AS mos_id,
  180.            multiIf(sso_id_lead_offset > 0, lead_sso_id,
  181.                    isNotNull(sso_id), sso_id,
  182.                    lag_sso_id) AS sso_id,
  183.            multiIf(user_agent_lead_offset > 0, lead_user_agent,
  184.                    isNotNull(user_agent), user_agent,
  185.                    lag_user_agent) AS user_agent,                
  186.            from, type, action, reply_to, card_id, card_type, card_from, text
  187.            
  188.       FROM (SELECT message_id, chat_id, session_id, timestamp, current_operator, from, type, action, reply_to, card_id, card_type, card_from, text,
  189.                    session_data_operator, session_data_operator_lead_offset, connector_id, connector_id_lead_offset, channel_id, channel_id_lead_offset,
  190.                    url_location, url_location_lead_offset, rate, rate_lead_offset, crafttalk_dialog_id, crafttalk_dialog_id_lead_offset, client_id,
  191.                    client_id_lead_offset, mobile_profile_id, mobile_profile_id_lead_offset, elk_person_id, elk_person_id_lead_offset, mos_id, mos_id_lead_offset,
  192.                    sso_id, sso_id_lead_offset, user_agent, user_agent_lead_offset,
  193.                    min(timestamp) OVER chat_window_ordered AS open_time,
  194.                    max(timestamp) OVER chat_window_ordered AS close_time,
  195.                    
  196.                    lagInFrame(current_operator, current_operator_lag_offset) OVER chat_window_ordered AS lag_current_operator,                    
  197.                    leadInFrame(session_data_operator, if(session_data_operator_lead_offset < 0, 0, session_data_operator_lead_offset)) OVER chat_window_ordered AS lead_session_data_operator,
  198.                    lagInFrame(session_data_operator, session_data_operator_lag_offset) OVER chat_window_ordered AS lag_session_data_operator,                    
  199.                    leadInFrame(connector_id, if(connector_id_lead_offset < 0, 0, connector_id_lead_offset)) OVER chat_window_ordered AS lead_connector_id,
  200.                    lagInFrame(connector_id, connector_id_lag_offset) OVER chat_window_ordered AS lag_connector_id,                      
  201.                    leadInFrame(channel_id, if(channel_id_lead_offset < 0, 0, channel_id_lead_offset)) OVER chat_window_ordered AS lead_channel_id,
  202.                    lagInFrame(channel_id, channel_id_lag_offset) OVER chat_window_ordered AS lag_channel_id,                      
  203.                    leadInFrame(url_location, if(url_location_lead_offset < 0, 0, url_location_lead_offset)) OVER chat_window_ordered AS lead_url_location,
  204.                    lagInFrame(url_location, url_location_lag_offset) OVER chat_window_ordered AS lag_url_location,                      
  205.                    leadInFrame(rate, if(rate_lead_offset < 0, 0, rate_lead_offset)) OVER chat_window_ordered AS lead_rate,
  206.                    lagInFrame(rate, rate_lag_offset) OVER chat_window_ordered AS lag_rate,                      
  207.                    leadInFrame(crafttalk_dialog_id, if(crafttalk_dialog_id_lead_offset < 0, 0, crafttalk_dialog_id_lead_offset)) OVER chat_window_ordered AS lead_crafttalk_dialog_id,
  208.                    lagInFrame(crafttalk_dialog_id, crafttalk_dialog_id_lag_offset) OVER chat_window_ordered AS lag_crafttalk_dialog_id,                      
  209.                    leadInFrame(client_id, if(client_id_lead_offset < 0, 0, client_id_lead_offset)) OVER chat_window_ordered AS lead_client_id,
  210.                    lagInFrame(client_id, client_id_lag_offset) OVER chat_window_ordered AS lag_client_id,                      
  211.                    leadInFrame(mobile_profile_id, if(mobile_profile_id_lead_offset < 0, 0, mobile_profile_id_lead_offset)) OVER chat_window_ordered AS lead_mobile_profile_id,
  212.                    lagInFrame(mobile_profile_id, mobile_profile_id_lag_offset) OVER chat_window_ordered AS lag_mobile_profile_id,                      
  213.                    leadInFrame(elk_person_id, if(elk_person_id_lead_offset < 0, 0, elk_person_id_lead_offset)) OVER chat_window_ordered AS lead_elk_person_id,
  214.                    lagInFrame(elk_person_id, elk_person_id_lag_offset) OVER chat_window_ordered AS lag_elk_person_id,                      
  215.                    leadInFrame(mos_id, if(mos_id_lead_offset < 0, 0, mos_id_lead_offset)) OVER chat_window_ordered AS lead_mos_id,
  216.                    lagInFrame(mos_id, mos_id_lag_offset) OVER chat_window_ordered AS lag_mos_id,                      
  217.                    leadInFrame(sso_id, if(sso_id_lead_offset < 0, 0, sso_id_lead_offset)) OVER chat_window_ordered AS lead_sso_id,
  218.                    lagInFrame(sso_id, sso_id_lag_offset) OVER chat_window_ordered AS lag_sso_id,                      
  219.                    leadInFrame(user_agent, if(user_agent_lead_offset < 0, 0, user_agent_lead_offset)) OVER chat_window_ordered AS lead_user_agent,
  220.                    lagInFrame(user_agent, user_agent_lag_offset) OVER chat_window_ordered AS lag_user_agent          
  221.                    
  222.               FROM wide_data JOIN offsets USING message_id
  223.             WINDOW chat_window_ordered AS (PARTITION BY chat_id ORDER BY row_num ASC))
  224. """
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement