Advertisement
sesquiipedalian

Untitled

Aug 27th, 2023
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 27.29 KB | None | 0 0
  1. def UGC_moderation(*args, **kwargs):
  2. from MPA_Tools.airflow_tools import extract_connections_from_json
  3. import vertica_python
  4. from datetime import datetime, timedelta, date
  5. import pandas as pd
  6.  
  7.  
  8. connections = extract_connections_from_json(kwargs['connections'])
  9. connection_vertica = connections["content_analytics_vertica"]
  10.  
  11. vertica_conn_info = {'host': str(connection_vertica.host), 'port': str(connection_vertica.port),
  12. 'database': str(connection_vertica.schema),
  13. 'user': str(connection_vertica.login), 'password': str(connection_vertica.password),
  14. 'read_timeout': 600, 'unicode_error': 'strict', 'ssl': False}
  15.  
  16. vertica_conn = vertica_python.connect(**vertica_conn_info)
  17.  
  18. START_DATE = '2023-01-01'
  19. # START_DATE = '2023-08-16'
  20.  
  21. END_DATE = date.today() - timedelta(1)
  22.  
  23. # START_DATE = END_DATE - timedelta(4)
  24.  
  25. cur = vertica_conn.cursor()
  26.  
  27. with vertica_python.connect(**vertica_conn_info) as connection:
  28. cur = connection.cursor('dict')
  29.  
  30.  
  31. # 99079191820006748
  32. # +-------------+----------+--------+
  33. # |cpu_time_hour|network_gb|spill_gb|
  34. # +-------------+----------+--------+
  35. # |0.3614 |24.9210 |0.0000 |
  36. # +-------------+----------+--------+
  37. # +-----------+-------+
  38. # |p95_div_p50|max_cpu|
  39. # +-----------+-------+
  40. # |1.15 |24.34 |
  41. # +-----------+-------+
  42.  
  43. cur.execute(f"""
  44. create local temporary table entity_uuid_sku on commit preserve rows as (
  45. (select distinct review_uuid as entity_uuid,
  46. item_id as sku
  47. from bx_rp_product.review as r
  48. join bx_rp_cm.status_history as sh
  49. on r.review_uuid = sh.external_entity_uuid
  50. and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
  51. union
  52. (select distinct review_uuid as entity_uuid,
  53. item_id as sku
  54. from bx_rp_product.review as r
  55. join bx_rp_cm.status_history as sh
  56. on r.review_uuid = sh.external_entity_root_parent_uuid
  57. and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
  58. ) order by sku segmented by hash(sku) all nodes;
  59. """)
  60.  
  61. print(1)
  62.  
  63. # 283726776541533075
  64. # +-------------+----------+--------+
  65. # |cpu_time_hour|network_gb|spill_gb|
  66. # +-------------+----------+--------+
  67. # |0.0545 |0.3553 |0.0000 |
  68. # +-------------+----------+--------+
  69. # +-----------+-------+
  70. # |p95_div_p50|max_cpu|
  71. # +-----------+-------+
  72. # |1.24 |4 |
  73. # +-----------+-------+
  74.  
  75. cur.execute(f"""
  76. create local temporary table entity_id_sku on commit preserve rows as (
  77. (select distinct iq.SourceKey as entity_id,
  78. i.SourceKey as sku
  79. from dwh_data.Anc_ItemQuestion as iq
  80. join dwh_data.Tie_ItemQuestion_Item as iq_i
  81. on iq.ItemQuestionId = iq_i.ItemQuestionId
  82. join dwh_data.Anc_Item as i
  83. on iq_i.ItemId = i.ItemId
  84. join bx_rp_cm.status_history as sh
  85. on iq.SourceKey = sh.external_entity_id
  86. and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
  87. union
  88. (select distinct ia.SourceKey as entity_id,
  89. i.SourceKey as sku
  90. from dwh_data.Anc_ItemAnswer as ia
  91. join dwh_data.Tie_ItemAnswer_ItemQuestion as ia_iq
  92. on ia.ItemAnswerId = ia_iq.ItemAnswerId
  93. join dwh_data.Anc_ItemQuestion as iq
  94. on ia_iq.ItemQuestionId = iq.ItemQuestionId
  95. join dwh_data.Tie_ItemQuestion_Item as iq_i
  96. on iq.ItemQuestionId = iq_i.ItemQuestionId
  97. join dwh_data.Anc_Item as i
  98. on iq_i.ItemId = i.ItemId
  99. join bx_rp_cm.status_history as sh
  100. on ia.SourceKey = sh.external_entity_id
  101. and date(sh.created_at) between '{START_DATE}' and '{END_DATE}')
  102. ) order by sku segmented by hash(sku) all nodes;
  103. """)
  104.  
  105.  
  106. print(2)
  107.  
  108. # 108086391075334458
  109. # +-------------+----------+--------+
  110. # |cpu_time_hour|network_gb|spill_gb|
  111. # +-------------+----------+--------+
  112. # |0.3419 |3.4550 |0.0000 |
  113. # +-------------+----------+--------+
  114. # +-----------+-------+
  115. # |p95_div_p50|max_cpu|
  116. # +-----------+-------+
  117. # |1.16 |24.02 |
  118. # +-----------+-------+
  119.  
  120. cur.execute(f"""
  121. create local temporary table sku_crossborder on commit preserve rows as
  122. select distinct i.SourceKey as Sku,
  123. case
  124. when s.SourceKey = 0 then false -- 1P товары
  125. else si.IsCrossborder
  126. end as is_crossborder
  127. from dwh_data.Anc_Seller as s
  128. left join dwh_data.Atr_Seller_IsCrossborder as si
  129. on si.SellerId = s.SellerId
  130. join dwh_data.Tie_Item_Seller as i_s
  131. on s.SellerId = i_s.SellerId
  132. join dwh_data.anc_item as i
  133. on i_s.ItemId = i.ItemId
  134. where true
  135. and i.SourceKey in (select distinct sku
  136. from entity_id_sku
  137. union
  138. select distinct sku
  139. from entity_uuid_sku)
  140. order by Sku segmented by hash(Sku) all nodes;
  141. """)
  142.  
  143. print(3)
  144.  
  145. cur.execute(f"""
  146. create local temporary table tmp_versions on commit preserve rows as
  147. select external_entity_uuid,
  148. external_entity_version,
  149. max(created_at) as version_last_moderation,
  150. lag(external_entity_version) over (partition by external_entity_uuid
  151. order by external_entity_version) as prev_version
  152. from bx_rp_cm.status_history as sh
  153. where true
  154. and external_entity_type = 1
  155. and created_at between '{START_DATE}' and '{END_DATE}'
  156. group by external_entity_uuid,
  157. external_entity_version
  158. segmented by hash(external_entity_uuid) all nodes;
  159. """)
  160.  
  161. print(4)
  162.  
  163. cur.execute(f"""
  164. create local temporary table tmp_prevmoderations on commit preserve rows as
  165. select external_entity_uuid,
  166. external_entity_version,
  167. max(created_at) as last_moderation
  168. from bx_rp_cm.status_history as sh
  169. where true
  170. and external_entity_type = 1
  171. and created_at between '{START_DATE}' and '{END_DATE}'
  172. group by external_entity_uuid, external_entity_version
  173. segmented by hash(external_entity_uuid) all nodes;
  174. """)
  175.  
  176. print(5)
  177.  
  178. cur.execute(f"""
  179. create local temporary table review_versions on commit preserve rows as
  180. select v.external_entity_uuid as review_uuid,
  181. v.external_entity_version as version,
  182. v.version_last_moderation,
  183. prev_m.last_moderation as prev_version_last_moderation
  184. from tmp_versions v
  185. left join tmp_prevmoderations prev_m
  186. on v.external_entity_uuid = prev_m.external_entity_uuid
  187. and v.prev_version = prev_m.external_entity_version
  188. order by version desc
  189. segmented by hash(review_uuid) all nodes;
  190. """)
  191.  
  192. print(6)
  193.  
  194. cur.execute(f"""
  195. create local temporary table media_versions on commit preserve rows as
  196. select r.review_uuid,
  197. sh.external_entity_uuid as media_uuid,
  198. r.prev_version_last_moderation,
  199. r.version_last_moderation,
  200. sh.created_at,
  201. r.version as review_version,
  202. sh.external_entity_type as entity_type,
  203. sh.user_id
  204. from bx_rp_cm.status_history as sh
  205. left join review_versions as r
  206. on sh.external_entity_root_parent_uuid = r.review_uuid
  207. where true
  208. and sh.external_entity_type in (2, 6, 9, 10)
  209. and (prev_version_last_moderation < sh.created_at and sh.created_at <= version_last_moderation)
  210. or (prev_version_last_moderation IS NULL and sh.created_at <= version_last_moderation)
  211. and date(version_last_moderation) between '{START_DATE}' and '{END_DATE}'
  212. order by r.version desc
  213. segmented by hash(review_uuid) all nodes;
  214. """)
  215.  
  216. print(7)
  217.  
  218. cur.execute(f"""
  219. create local temporary table final_payout on commit preserve rows as
  220. select
  221. entity_uuid,
  222. entity_version,
  223. entity_type_id,
  224. null as entity_parent_root_uuid,
  225. null as entity_parent_root,
  226. user_id,
  227. status_id,
  228. date as moderated_at
  229. from
  230. bx_rp_cm.moderator_payout
  231. where true
  232. and entity_uuid is not null
  233. and entity_type_id = 1 -- для наглядности берем только отзывы
  234. and date(date) between '{START_DATE}' and '{END_DATE}'
  235. union all
  236.  
  237. select
  238. mv.media_uuid,
  239. mv.review_version,
  240. mv.entity_type,
  241. mv.review_uuid as entity_parent_root_uuid,
  242. review_version as entity_parent_root_version,
  243. mp.user_id,
  244. mp.status_id,
  245. mp.date as moderated_at
  246.  
  247. from bx_rp_cm.moderator_payout as mp
  248. join media_versions as mv
  249. on mp.entity_uuid = mv.review_uuid
  250. and mp.entity_version = mv.review_version
  251. order by entity_uuid, entity_version;
  252. """)
  253.  
  254. print(8)
  255.  
  256. cur.execute(f"""
  257. delete from CAN_team.UGC_moderation
  258. where date(moderated) between '{START_DATE}' and '{END_DATE}'
  259. """)
  260.  
  261. print(9)
  262.  
  263.  
  264. # 108086391075334665
  265. # +-------------+----------+--------+
  266. # |cpu_time_hour|network_gb|spill_gb|
  267. # +-------------+----------+--------+
  268. # |0.1079 |44.7491 |0.8922 |
  269. # +-------------+----------+--------+
  270. # +-----------+-------+
  271. # |p95_div_p50|max_cpu|
  272. # +-----------+-------+
  273. # |1.3 |25.01 |
  274. # +-----------+-------+
  275.  
  276.  
  277. cur.execute(f"""
  278. create local temporary table table_1 on commit preserve rows as
  279. ( select timestampdiff('second', tb.created_at, next_created_at) as diff,
  280. case
  281. when user_id = 0 and (diff >= 10 * 60 or diff is null)
  282. then 'автомодерация'
  283. when us.role <> 'operator' and user_id <> 0 and (diff >= 10 * 60 or diff is null)
  284. then 'ручная неоплачиваемая модерация'
  285. when diff < 10 * 60
  286. then 'double'
  287. end as moderated,
  288. case
  289. when allow_to_publish = false then 'declined'
  290. when allow_to_publish = true then 'approved'
  291. end as status,
  292. case
  293. when tb.entity_type in (1, 2, 3, 4, 5, 6) then 0
  294. when tb.entity_type in (7, 8, 9, 10) then 1
  295. end as is_travel,
  296. euk.sku as sku,
  297. tb.created_at as moderated_at,
  298. tb.status_id,
  299. tb.request_id,
  300. tb.user_id,
  301. case
  302. when tb.entity_type in (1, 7) then 'review'
  303. when tb.entity_type in (3, 8) then 'comment'
  304. end as entity_type,
  305. tb.entity_id,
  306. tb.entity_uuid,
  307. tb.entity_root_parent_id,
  308. tb.entity_root_parent_uuid,
  309. case
  310. when sc.is_crossborder = true then 1
  311. when sc.is_crossborder = false then 0
  312. end as is_crossborder
  313.  
  314. from (select distinct
  315. sh.status_id,
  316. sh.user_id,
  317. sh.created_at,
  318. sh.external_entity_id as entity_id,
  319. sh.external_entity_type as entity_type,
  320. sh.external_entity_uuid as entity_uuid,
  321. sh.external_entity_root_parent_id as entity_root_parent_id,
  322. sh.external_entity_root_parent_uuid as entity_root_parent_uuid,
  323.  
  324. hash(sh.external_entity_uuid, sh.created_at, sh.user_id, sh.status_id) as request_id,
  325.  
  326. lead(sh.created_at, 1) over (partition by sh.external_entity_uuid,
  327. sh.external_entity_version
  328. order by sh.created_at) as next_created_at
  329.  
  330. from bx_rp_cm.status_history as sh
  331. where true
  332. and sh.external_entity_type in (1, 3, 7, 8) -- отзывы и комментарии
  333. and date(sh.created_at) between '{START_DATE}' and '{END_DATE}'
  334. ) as tb
  335.  
  336. left join bx_rp_cm.user as us
  337. on tb.user_id = us.id
  338. left join bx_rp_product.review as rvs
  339. on tb.entity_uuid = rvs.review_uuid
  340. left join bx_rp_cm.statuses as ss
  341. on tb.status_id = ss.id and tb.entity_type = ss.external_entity_type
  342. left join entity_uuid_sku as euk
  343. on tb.entity_uuid = euk.entity_uuid
  344. left join sku_crossborder as sc
  345. on euk.sku = sc.Sku
  346. where true
  347. and us.role <> 'operator' or user_id = 0
  348. order by tb.created_at )
  349.  
  350. """)
  351.  
  352. print(10)
  353.  
  354. cur.execute(f"""
  355. create local temporary table table_2 on commit preserve rows as
  356. (select timestampdiff('second', tb.created_at, next_created_at) as diff,
  357. case
  358. when user_id = 0 and (diff >= 10 * 60 or diff is null)
  359. then 'автомодерация'
  360. when us.role <> 'operator' and user_id <> 0 and (diff >= 10 * 60 or diff is null)
  361. then 'ручная неоплачиваемая модерация'
  362. when diff < 10 * 60
  363. then 'double'
  364. end as moderated,
  365.  
  366. case
  367. when allow_to_publish = false then 'declined'
  368. when allow_to_publish = true then 'approved'
  369. end as status,
  370.  
  371. case
  372. when tb.entity_type in (1, 2, 3, 4, 5, 6) then 0
  373. when tb.entity_type in (7, 8, 9, 10) then 1
  374. end as is_travel,
  375. euk.sku as sku,
  376. tb.created_at as moderated_at,
  377. tb.status_id,
  378. tb.request_id,
  379. tb.user_id,
  380. case
  381. when tb.entity_type in (2, 9) then 'photo'
  382. when tb.entity_type in (6, 10) then 'video'
  383. end as entity_type,
  384. tb.entity_id,
  385. tb.entity_uuid,
  386. tb.entity_root_parent_id,
  387. tb.entity_root_parent_uuid,
  388. case
  389. when sc.is_crossborder = true then 1
  390. when sc.is_crossborder = false then 0
  391. end as is_crossborder
  392. from (select distinct sh.status_id,
  393. sh.user_id,
  394. sh.created_at,
  395. sh.external_entity_id as entity_id,
  396. sh.external_entity_type as entity_type,
  397. sh.external_entity_uuid as entity_uuid,
  398. sh.external_entity_root_parent_id as entity_root_parent_id,
  399. sh.external_entity_root_parent_uuid as entity_root_parent_uuid,
  400.  
  401. hash(sh.external_entity_id, sh.created_at, sh.user_id, sh.status_id) as request_id,
  402. lead(sh.created_at, 1) over (partition by sh.external_entity_uuid
  403. order by sh.created_at) as next_created_at
  404. from bx_rp_cm.status_history as sh
  405. where true
  406. and sh.external_entity_type in (2, 6, 9, 10)
  407. and date(sh.created_at) between '{START_DATE}' and '{END_DATE}') as tb
  408. left join bx_rp_cm.user as us
  409. on tb.user_id = us.id
  410. left join bx_rp_cm.statuses as ss
  411. on tb.status_id = ss.id and tb.entity_type = ss.external_entity_type
  412. left join entity_uuid_sku as euk
  413. on tb.entity_root_parent_uuid = euk.entity_uuid
  414. left join sku_crossborder as sc
  415. on euk.sku = sc.Sku
  416. where true
  417. and us.role <> 'operator' or user_id = 0
  418. order by tb.created_at)
  419.  
  420. """)
  421.  
  422. print(11)
  423.  
  424.  
  425. cur.execute(f"""
  426.  
  427. create local temporary table table_3 on commit preserve rows as
  428.  
  429. ( select timestampdiff('second', tb.created_at, next_created_at) as diff,
  430. case
  431. when user_id = 0 and (diff >= 10 * 60 or diff is null)
  432. then 'автомодерация'
  433. when us.role <> 'operator' and user_id <> 0 and (diff >= 10 * 60 or diff is null)
  434. then 'ручная неоплачиваемая модерация'
  435. when diff < 10 * 60
  436. then 'double'
  437. end as moderated,
  438. case
  439. when allow_to_publish = false then 'declined'
  440. when allow_to_publish = true then 'approved'
  441. end as status,
  442. case
  443. when tb.entity_type in (1, 2, 3, 4, 5, 6) then 0
  444. when tb.entity_type in (7, 8, 9, 10) then 1
  445. end as is_travel,
  446. eik.sku as sku,
  447. tb.created_at as moderated_at,
  448. tb.status_id,
  449. tb.request_id,
  450. tb.user_id,
  451. case
  452. when tb.entity_type = 4 then 'answer'
  453. when tb.entity_type = 5 then 'question'
  454. end as entity_type,
  455. tb.entity_id,
  456. tb.entity_uuid,
  457. tb.entity_root_parent_id, -- для запроса без with не мешает по времени
  458. tb.entity_root_parent_uuid, -- для запроса без with не мешает по времени
  459. case
  460. when sc.is_crossborder = true then 1
  461. when sc.is_crossborder = false then 0
  462. end as is_crossborder
  463. from (select distinct
  464. sh.status_id,
  465. sh.user_id,
  466. sh.created_at,
  467. sh.external_entity_id as entity_id,
  468. sh.external_entity_type as entity_type,
  469. sh.external_entity_uuid as entity_uuid,
  470. sh.external_entity_root_parent_id as entity_root_parent_id,
  471. sh.external_entity_root_parent_uuid as entity_root_parent_uuid,
  472.  
  473. hash(sh.external_entity_id, sh.created_at, sh.user_id, sh.status_id) as request_id,
  474. lead(sh.created_at, 1) over (partition by sh.external_entity_id
  475. order by sh.created_at) as next_created_at
  476. from bx_rp_cm.status_history as sh
  477. where true
  478. and sh.external_entity_type in (4, 5)
  479. and date(sh.created_at) between '{START_DATE}' and '{END_DATE}') as tb
  480. left join bx_rp_cm.user as us
  481. on tb.user_id = us.id
  482. left join bx_rp_cm.statuses as ss
  483. on tb.status_id = ss.id and tb.entity_type = ss.external_entity_type
  484. left join entity_id_sku as eik
  485. on tb.entity_id = eik.entity_id
  486. left join sku_crossborder as sc
  487. on eik.sku = sc.sku
  488. where true
  489. and us.role <> 'operator' or user_id = 0
  490. order by tb.created_at );
  491. """)
  492.  
  493. print(12)
  494.  
  495. cur.execute(f"""
  496. create local temporary table table_4 on commit preserve rows as
  497. select
  498. null::int as diff,
  499. 'ручная оплачиваемая модерация' as moderated,
  500. case
  501. when ss.allow_to_publish = false then 'declined'
  502. when ss.allow_to_publish = true then 'approved'
  503. end as status,
  504.  
  505. case
  506. when fp.entity_type_id in (1, 2, 3, 4, 5, 6) then 0
  507. when fp.entity_type_id in (7, 8, 9, 10) then 1
  508. end as is_travel,
  509. euk.sku as sku,
  510. moderated_at,
  511. fp.status_id,
  512. hash(fp.entity_uuid, fp.moderated_at, fp.user_id, fp.status_id) as request_id,
  513. fp.user_id,
  514. case
  515. when fp.entity_type_id = 1 then 'review'
  516. when fp.entity_type_id in (2, 9) then 'photo'
  517. when fp.entity_type_id in (6, 10) then 'video'
  518. end as entity_type,
  519. null::int as entity_id,
  520. fp.entity_uuid as entity_uuid,
  521. null::int as entity_root_parent_id,
  522. fp.entity_parent_root_uuid,
  523. case
  524. when sc.is_crossborder = true then 1
  525. when sc.is_crossborder = false then 0
  526. end as is_crossborder
  527. from final_payout as fp
  528. left join bx_rp_cm.user as us
  529. on fp.user_id = us.id
  530. left join bx_rp_cm.statuses as ss
  531. on fp.status_id = ss.id and fp.entity_type_id = ss.external_entity_type
  532. left join entity_uuid_sku as euk
  533. on fp.entity_parent_root_uuid = euk.entity_uuid
  534. left join sku_crossborder as sc
  535. on euk.sku = sc.sku
  536. where fp.entity_type_id in (1, 2, 6, 7, 9, 10)
  537. order by moderated_at
  538. """)
  539.  
  540. print(13)
  541.  
  542.  
  543. cur.execute(f"""
  544. insert into CAN_team.UGC_moderation
  545. (
  546. diff,
  547. moderated,
  548. status,
  549. is_travel,
  550. sku,
  551. moderated_at,
  552. status_id,
  553. request_id,
  554. user_id,
  555. entity_type,
  556. entity_id,
  557. entity_uuid,
  558. entity_root_parent_id,
  559. entity_root_parent_uuid,
  560. is_crossborder
  561. )
  562.  
  563. select * from table_1
  564. union
  565. select * from table_2
  566. union
  567. select * from table_3
  568. union
  569. select * from table_4
  570.  
  571. """)
  572.  
  573. print(14)
  574.  
  575.  
  576.  
  577. # create table CAN_team.UGC_moderation
  578. # (
  579. # diff int,
  580. # moderated varchar(80),
  581. # status varchar(12),
  582. # IsTravel boolean,
  583. # sku int,
  584. # moderated_at timestamp,
  585. # -- rvs.published_at,
  586. # status_id int,
  587. # request_id int,
  588. # user_id int,
  589. # entity_type varchar(12),
  590. # entity_id int,
  591. # entity_uuid uuid,
  592. # entity_root_parent_id int,
  593. # entity_root_parent_uuid uuid,
  594. # -- rvs.client_id,
  595. # IsCrossborder boolean
  596. # ) order by request_id segmented by request_id all nodes;
  597.  
  598.  
  599. connection.commit()
  600. cur.close()
  601.  
  602.  
  603.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement