Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Load required libraries
- library(DBI)
- library(RMySQL)
- # Init settings
- conn <- dbConnect(RMySQL::MySQL(), host="localhost", user="rstudio", password="rstudio", dbname="rstudio");
- baseFolder = "~/Datafeeds"
- # 1. Setting directory to FTP folder where files incoming from Adobe ----
- setwd(baseFolder)
- # 2. Sort files into three separate folders ----
- hitData = "./hit_data"
- metaDataFolder="./metadata"
- columnDataFolder = "./columns"
- # 3. Optimisation. Only work on files that aren't already in the DB ----
- # Get a list of previously imported datafeeds so we don't put duplicate data into our DB ----
- dbResult = dbSendQuery(conn, "SELECT DISTINCT file FROM clickstream")
- previouslyImported = dbFetch(dbResult)
- dbClearResult(dbResult)
- # Compare the files on disk to those in the DB.
- filesToImport=cbind(
- as.data.frame(list.files(baseFolder,".tar.gz")),
- as.data.frame(sub(".tar.gz", ".tsv", list.files(baseFolder,".tar.gz")))
- )
- colnames(filesToImport) = c("filesOnDisk","filesToCompare")
- # Dataframe with just those we've not seen before.
- filesToImport=subset(filesToImport, !(filesToImport$filesToCompare %in% previouslyImported$file ))
- print("Importing the following files:")
- print(filesToImport$filesOnDisk)
- # 4. Extract metadata; browsers, colour_deption, country, languages, etc ----
- metaDataFiles = c("browser_type.tsv","browser.tsv","color_depth.tsv","connection_type.tsv","country.tsv","javascript_version.tsv","languages.tsv","operating_systems.tsv","plugins.tsv","referrer_type.tsv","resolution.tsv","search_engines.tsv")
- lapply(as.character(filesToImport$filesOnDisk), function(x){
- if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(metaDataFolder)){
- print(paste('Extracting metadata from',x))
- untar(x, files=metaDataFiles, exdir=metaDataFolder)
- }
- })
- # 5. Importing metadata into MySQL ----
- setwd(paste0(baseFolder, "/", metaDataFolder))
- loadlookup <- function(tblname){
- df <- read.csv2(paste(tblname,".tsv", sep=""), sep = "\t", header = FALSE, stringsAsFactors = FALSE)
- if(dbExistsTable(conn, tblname)){ dbRemoveTable(conn, tblname) }
- dbWriteTable(conn, name=tblname, value=df, row.names=FALSE,overwrite=TRUE,append=FALSE )
- dbSendQuery(conn, paste0("ALTER TABLE ", tblname,
- " CHANGE COLUMN V1 id BIGINT,",
- " CHANGE COLUMN V2 label varchar(255),",
- " ADD INDEX `idx_", tblname, "` USING BTREE (label)"
- ))
- dbGetQuery(conn, paste0("OPTIMIZE TABLE ", tblname))
- }
- metaDataTables = unlist(lapply(list.files(pattern = "*.tsv"), function(x){print(sub(".tsv","",x))}))
- for(file in list.files(pattern = "*.tsv")){
- print(file)
- for(tbl in metaDataTables){
- loadlookup(tbl)
- }
- }
- # 6. Create some views for convenience
- if(dbExistsTable(conn, "v_browsers")==FALSE){
- dbSendQuery(conn, paste0("CREATE VIEW `rstudio`.`v_browsers` AS ",
- "SELECT `browser`.`id` AS `browser_id`, ",
- "`browser`.`label` AS `browser`, ",
- "`browser_type`.`label` AS `browser_type` ",
- "FROM (`browser` join `browser_type` on((`browser`.`id` = `browser_type`.`id`)))"
- ))
- }
- # 7. Extract column data and events ----
- columnData = c("column_headers.tsv","event.tsv")
- lapply(as.character(filesToImport$filesOnDisk), function(x){
- if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(columnDataFolder)){
- print(paste('Extracting column data from',x))
- untar(x, files=columnData, exdir=columnDataFolder)
- }
- })
- # 8. Extract all Hit Data and save to a separate folder ----
- lapply(as.character(filesToImport$filesOnDisk), function(x){
- untar(x, files="hit_data.tsv", exdir=hitData );
- hitDataFile = paste0(hitData, "/hit_data.tsv");
- # rename the hit_data file to match enclosing filename.
- newHitDataFile = paste0(hitData, "/", sub(".tar.gz",".tsv",x))
- file.rename(hitDataFile, newHitDataFile)
- })
- # 9. Load clickstream data into DB ----
- #Set directory to avoid having to use paste to build urls
- setwd(paste0(baseFolder, "/", hitData))
- # Set column headers for server calls ----
- column_headers = unlist(colnames(read.delim(
- paste0(baseFolder, "/", columnDataFolder, "/", "column_headers.tsv"),
- stringsAsFactors=FALSE
- )))
- column_headers = append(column_headers, "file")
- # During testing, always drop the clickstream table. ----
- #if(dbExistsTable(conn, "clickstream")){dbRemoveTable(conn, "clickstream")}
- # 10. Loop over list of files selected for import ----
- for(file in filesToImport$file){
- print(file)
- # During testing, just use first 5 rows of data so things load faster.
- # hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character", nrows=5)
- # Use the below line for prod
- hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character")
- # Add the filename
- hit_data$file <- file
- colnames(hit_data) = column_headers
- hit_data = hit_data[ , !grepl( "^(evar|prop)" , names( hit_data ) ) ]
- dbWriteTable(conn, name = 'clickstream', value = hit_data, append = T, overwrite = F, col.names=column_headers)
- rm(hit_data)
- }
- # Run analyze in MySQL so that query planner has accurate information
- dbGetQuery(conn, "OPTIMIZE TABLE clickstream")
- # Remove hit_data files.
- setwd(paste0(baseFolder, "/", hitData))
- for(file in list.files(pattern = "*.tsv")){
- file.remove(file)
- }
- dbDisconnect(conn)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement