Advertisement
Guest User

Untitled

a guest
Aug 11th, 2016
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.08 KB | None | 0 0
  1. # Load required libraries
  2. library(DBI)
  3. library(RMySQL)
  4.  
  5. # Init settings
  6. conn <- dbConnect(RMySQL::MySQL(), host="localhost", user="rstudio", password="rstudio", dbname="rstudio");
  7. baseFolder = "~/Datafeeds"
  8. # 1. Setting directory to FTP folder where files incoming from Adobe ----
  9. setwd(baseFolder)
  10.  
  11. # 2. Sort files into three separate folders ----
  12. hitData = "./hit_data"
  13. metaDataFolder="./metadata"
  14. columnDataFolder = "./columns"
  15.  
  16. # 3. Optimisation. Only work on files that aren't already in the DB ----
  17.  
  18. # Get a list of previously imported datafeeds so we don't put duplicate data into our DB ----
  19. dbResult = dbSendQuery(conn, "SELECT DISTINCT file FROM clickstream")
  20. previouslyImported = dbFetch(dbResult)
  21. dbClearResult(dbResult)
  22.  
  23. # Compare the files on disk to those in the DB.
  24. filesToImport=cbind(
  25. as.data.frame(list.files(baseFolder,".tar.gz")),
  26. as.data.frame(sub(".tar.gz", ".tsv", list.files(baseFolder,".tar.gz")))
  27. )
  28. colnames(filesToImport) = c("filesOnDisk","filesToCompare")
  29.  
  30. # Dataframe with just those we've not seen before.
  31. filesToImport=subset(filesToImport, !(filesToImport$filesToCompare %in% previouslyImported$file ))
  32.  
  33. print("Importing the following files:")
  34. print(filesToImport$filesOnDisk)
  35.  
  36. # 4. Extract metadata; browsers, colour_deption, country, languages, etc ----
  37. metaDataFiles = c("browser_type.tsv","browser.tsv","color_depth.tsv","connection_type.tsv","country.tsv","javascript_version.tsv","languages.tsv","operating_systems.tsv","plugins.tsv","referrer_type.tsv","resolution.tsv","search_engines.tsv")
  38.  
  39. lapply(as.character(filesToImport$filesOnDisk), function(x){
  40. if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(metaDataFolder)){
  41. print(paste('Extracting metadata from',x))
  42. untar(x, files=metaDataFiles, exdir=metaDataFolder)
  43. }
  44. })
  45.  
  46. # 5. Importing metadata into MySQL ----
  47. setwd(paste0(baseFolder, "/", metaDataFolder))
  48.  
  49. loadlookup <- function(tblname){
  50. df <- read.csv2(paste(tblname,".tsv", sep=""), sep = "\t", header = FALSE, stringsAsFactors = FALSE)
  51. if(dbExistsTable(conn, tblname)){ dbRemoveTable(conn, tblname) }
  52. dbWriteTable(conn, name=tblname, value=df, row.names=FALSE,overwrite=TRUE,append=FALSE )
  53. dbSendQuery(conn, paste0("ALTER TABLE ", tblname,
  54. " CHANGE COLUMN V1 id BIGINT,",
  55. " CHANGE COLUMN V2 label varchar(255),",
  56. " ADD INDEX `idx_", tblname, "` USING BTREE (label)"
  57. ))
  58. dbGetQuery(conn, paste0("OPTIMIZE TABLE ", tblname))
  59. }
  60.  
  61. metaDataTables = unlist(lapply(list.files(pattern = "*.tsv"), function(x){print(sub(".tsv","",x))}))
  62. for(file in list.files(pattern = "*.tsv")){
  63. print(file)
  64.  
  65. for(tbl in metaDataTables){
  66. loadlookup(tbl)
  67. }
  68. }
  69.  
  70. # 6. Create some views for convenience
  71. if(dbExistsTable(conn, "v_browsers")==FALSE){
  72. dbSendQuery(conn, paste0("CREATE VIEW `rstudio`.`v_browsers` AS ",
  73. "SELECT `browser`.`id` AS `browser_id`, ",
  74. "`browser`.`label` AS `browser`, ",
  75. "`browser_type`.`label` AS `browser_type` ",
  76. "FROM (`browser` join `browser_type` on((`browser`.`id` = `browser_type`.`id`)))"
  77. ))
  78. }
  79.  
  80. # 7. Extract column data and events ----
  81. columnData = c("column_headers.tsv","event.tsv")
  82. lapply(as.character(filesToImport$filesOnDisk), function(x){
  83. if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(columnDataFolder)){
  84. print(paste('Extracting column data from',x))
  85. untar(x, files=columnData, exdir=columnDataFolder)
  86. }
  87. })
  88.  
  89. # 8. Extract all Hit Data and save to a separate folder ----
  90. lapply(as.character(filesToImport$filesOnDisk), function(x){
  91. untar(x, files="hit_data.tsv", exdir=hitData );
  92. hitDataFile = paste0(hitData, "/hit_data.tsv");
  93. # rename the hit_data file to match enclosing filename.
  94. newHitDataFile = paste0(hitData, "/", sub(".tar.gz",".tsv",x))
  95. file.rename(hitDataFile, newHitDataFile)
  96. })
  97.  
  98. # 9. Load clickstream data into DB ----
  99. #Set directory to avoid having to use paste to build urls
  100. setwd(paste0(baseFolder, "/", hitData))
  101.  
  102. # Set column headers for server calls ----
  103. column_headers = unlist(colnames(read.delim(
  104. paste0(baseFolder, "/", columnDataFolder, "/", "column_headers.tsv"),
  105. stringsAsFactors=FALSE
  106. )))
  107. column_headers = append(column_headers, "file")
  108.  
  109.  
  110. # During testing, always drop the clickstream table. ----
  111. #if(dbExistsTable(conn, "clickstream")){dbRemoveTable(conn, "clickstream")}
  112.  
  113. # 10. Loop over list of files selected for import ----
  114. for(file in filesToImport$file){
  115. print(file)
  116. # During testing, just use first 5 rows of data so things load faster.
  117. # hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character", nrows=5)
  118. # Use the below line for prod
  119. hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character")
  120. # Add the filename
  121. hit_data$file <- file
  122. colnames(hit_data) = column_headers
  123. hit_data = hit_data[ , !grepl( "^(evar|prop)" , names( hit_data ) ) ]
  124. dbWriteTable(conn, name = 'clickstream', value = hit_data, append = T, overwrite = F, col.names=column_headers)
  125. rm(hit_data)
  126. }
  127.  
  128. # Run analyze in MySQL so that query planner has accurate information
  129. dbGetQuery(conn, "OPTIMIZE TABLE clickstream")
  130.  
  131. # Remove hit_data files.
  132. setwd(paste0(baseFolder, "/", hitData))
  133. for(file in list.files(pattern = "*.tsv")){
  134. file.remove(file)
  135. }
  136.  
  137. dbDisconnect(conn)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement