Guest User

Untitled

a guest
Oct 10th, 2024
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.73 KB | None | 0 0
  1. Current solution is to use something like this within a dataflow to incrementally refresh to a lakehouse.... any feedback and suggestions are welcome
  2. let
  3.     // Parameters (Adjust these for each table)
  4.     ServerName = "svrname",
  5.     DatabaseName = "dbname",
  6.     SchemaName = "schema",
  7.     TableName = "tblename",    
  8.     PrimaryKeyName = "PK",
  9.     WorkspaceId = "WorkspaceId",
  10.     LakehouseId = "LakehouseId",
  11.  
  12.     // Function to get the Lakehouse table ID based on parameters
  13.     GetLakehouseTableId = (server as text, database as text, schema as text, table as text) =>
  14.         server & "_" & database & "_" & schema & "_" & table,
  15.  
  16.     // Get the Lakehouse table ID
  17.     LakehouseTableId = GetLakehouseTableId(ServerName, DatabaseName, SchemaName, TableName),
  18.  
  19.     // Function to get the last updated date from the Lakehouse table
  20.     GetLastUpdatedDate = (workspaceId as text, lakehouseId as text, tableId as text) =>
  21.         let
  22.             Source = Lakehouse.Contents([]),
  23.             Workspace = Source{[workspaceId = workspaceId]}[Data],
  24.             Lakehouse = Workspace{[lakehouseId = lakehouseId]}[Data],
  25.             TableData = Lakehouse{[Id = tableId, ItemKind = "Table"]}[Data],
  26.             // Select only the UpdatedDate column
  27.             UpdatedDates = if Table.HasColumns(TableData, {"UpdatedDate"}) then
  28.                 Table.SelectColumns(TableData, {"UpdatedDate"})
  29.             else
  30.                 Table.FromRecords({}),
  31.             // Get the maximum UpdatedDate
  32.             LastUpdatedDateValue = if Table.RowCount(UpdatedDates) > 0 then
  33.                 Table.Max(UpdatedDates, "UpdatedDate")[UpdatedDate]
  34.             else
  35.                 // If the table is empty, set a default past date
  36.                 #datetime(2000, 1, 1, 0, 0, 0)
  37.         in
  38.             LastUpdatedDateValue,
  39.  
  40.     // Get the last updated date for the specific table
  41.     LastUpdatedDate = GetLastUpdatedDate(WorkspaceId, LakehouseId, LakehouseTableId),
  42.  
  43.     // Function to retrieve updated records from the SQL database with query folding
  44.     GetUpdatedRecords = (server as text, database as text, schema as text, table as text, lastUpdatedDate as datetime) =>
  45.         let
  46.             Source = Sql.Database(server, database),
  47.             TableData = Source{[Schema = schema, Item = table]}[Data],
  48.             // Filter rows where UpdatedDate > lastUpdatedDate
  49.             FilteredRows = Table.SelectRows(TableData, each [UpdatedDate] > lastUpdatedDate)
  50.         in
  51.             FilteredRows,
  52.  
  53.     // Retrieve the updated records from the SQL database
  54.     NewRecords = GetUpdatedRecords(ServerName, DatabaseName, SchemaName, TableName, LastUpdatedDate),
  55.  
  56.     // Get the list of primary keys from NewRecords
  57.     NewPrimaryKeys = List.Distinct(Table.Column(NewRecords, PrimaryKeyName)),
  58.  
  59.     // Function to get existing primary keys from the Lakehouse table for the given list
  60.     GetExistingPrimaryKeys = (workspaceId as text, lakehouseId as text, tableId as text, primaryKeyName as text, primaryKeyValues as list) =>
  61.         let
  62.             Source = Lakehouse.Contents([]),
  63.             Workspace = Source{[workspaceId = workspaceId]}[Data],
  64.             Lakehouse = Workspace{[lakehouseId = lakehouseId]}[Data],
  65.             TableData = Lakehouse{[Id = tableId, ItemKind = "Table"]}[Data],
  66.             // Check if the table has the primary key column
  67.             ExistingKeys = if Table.HasColumns(TableData, primaryKeyName) then
  68.                 // Filter Lakehouse table to include only records with primary keys in primaryKeyValues
  69.                 Table.SelectRows(
  70.                     Table.SelectColumns(TableData, {primaryKeyName}),
  71.                     each List.Contains(primaryKeyValues, Record.Field(_, primaryKeyName))
  72.                 )
  73.             else
  74.                 Table.FromRecords({})
  75.         in
  76.             ExistingKeys,
  77.  
  78.     // Get existing primary keys from the Lakehouse table for the new primary keys
  79.     ExistingPrimaryKeysTable = GetExistingPrimaryKeys(WorkspaceId, LakehouseId, LakehouseTableId, PrimaryKeyName, NewPrimaryKeys),
  80.  
  81.     // Convert existing primary keys to a buffered list for performance
  82.     ExistingPrimaryKeysList = List.Buffer(Table.Column(ExistingPrimaryKeysTable, PrimaryKeyName)),
  83.  
  84.     // Exclude existing records from NewRecords
  85.     UpdatedRecords = Table.SelectRows(NewRecords, each not List.Contains(ExistingPrimaryKeysList, Record.Field(_, PrimaryKeyName))),
  86.  
  87.  
  88. in
  89.     UpdatedRecords
  90.  
  91.  
Advertisement
Add Comment
Please, Sign In to add comment