Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Current solution is to use something like this within a dataflow to incrementally refresh to a lakehouse.... any feedback and suggestions are welcome
- let
- // Parameters (Adjust these for each table)
- ServerName = "svrname",
- DatabaseName = "dbname",
- SchemaName = "schema",
- TableName = "tblename",
- PrimaryKeyName = "PK",
- WorkspaceId = "WorkspaceId",
- LakehouseId = "LakehouseId",
- // Function to get the Lakehouse table ID based on parameters
- GetLakehouseTableId = (server as text, database as text, schema as text, table as text) =>
- server & "_" & database & "_" & schema & "_" & table,
- // Get the Lakehouse table ID
- LakehouseTableId = GetLakehouseTableId(ServerName, DatabaseName, SchemaName, TableName),
- // Function to get the last updated date from the Lakehouse table
- GetLastUpdatedDate = (workspaceId as text, lakehouseId as text, tableId as text) =>
- let
- Source = Lakehouse.Contents([]),
- Workspace = Source{[workspaceId = workspaceId]}[Data],
- Lakehouse = Workspace{[lakehouseId = lakehouseId]}[Data],
- TableData = Lakehouse{[Id = tableId, ItemKind = "Table"]}[Data],
- // Select only the UpdatedDate column
- UpdatedDates = if Table.HasColumns(TableData, {"UpdatedDate"}) then
- Table.SelectColumns(TableData, {"UpdatedDate"})
- else
- Table.FromRecords({}),
- // Get the maximum UpdatedDate
- LastUpdatedDateValue = if Table.RowCount(UpdatedDates) > 0 then
- Table.Max(UpdatedDates, "UpdatedDate")[UpdatedDate]
- else
- // If the table is empty, set a default past date
- #datetime(2000, 1, 1, 0, 0, 0)
- in
- LastUpdatedDateValue,
- // Get the last updated date for the specific table
- LastUpdatedDate = GetLastUpdatedDate(WorkspaceId, LakehouseId, LakehouseTableId),
- // Function to retrieve updated records from the SQL database with query folding
- GetUpdatedRecords = (server as text, database as text, schema as text, table as text, lastUpdatedDate as datetime) =>
- let
- Source = Sql.Database(server, database),
- TableData = Source{[Schema = schema, Item = table]}[Data],
- // Filter rows where UpdatedDate > lastUpdatedDate
- FilteredRows = Table.SelectRows(TableData, each [UpdatedDate] > lastUpdatedDate)
- in
- FilteredRows,
- // Retrieve the updated records from the SQL database
- NewRecords = GetUpdatedRecords(ServerName, DatabaseName, SchemaName, TableName, LastUpdatedDate),
- // Get the list of primary keys from NewRecords
- NewPrimaryKeys = List.Distinct(Table.Column(NewRecords, PrimaryKeyName)),
- // Function to get existing primary keys from the Lakehouse table for the given list
- GetExistingPrimaryKeys = (workspaceId as text, lakehouseId as text, tableId as text, primaryKeyName as text, primaryKeyValues as list) =>
- let
- Source = Lakehouse.Contents([]),
- Workspace = Source{[workspaceId = workspaceId]}[Data],
- Lakehouse = Workspace{[lakehouseId = lakehouseId]}[Data],
- TableData = Lakehouse{[Id = tableId, ItemKind = "Table"]}[Data],
- // Check if the table has the primary key column
- ExistingKeys = if Table.HasColumns(TableData, primaryKeyName) then
- // Filter Lakehouse table to include only records with primary keys in primaryKeyValues
- Table.SelectRows(
- Table.SelectColumns(TableData, {primaryKeyName}),
- each List.Contains(primaryKeyValues, Record.Field(_, primaryKeyName))
- )
- else
- Table.FromRecords({})
- in
- ExistingKeys,
- // Get existing primary keys from the Lakehouse table for the new primary keys
- ExistingPrimaryKeysTable = GetExistingPrimaryKeys(WorkspaceId, LakehouseId, LakehouseTableId, PrimaryKeyName, NewPrimaryKeys),
- // Convert existing primary keys to a buffered list for performance
- ExistingPrimaryKeysList = List.Buffer(Table.Column(ExistingPrimaryKeysTable, PrimaryKeyName)),
- // Exclude existing records from NewRecords
- UpdatedRecords = Table.SelectRows(NewRecords, each not List.Contains(ExistingPrimaryKeysList, Record.Field(_, PrimaryKeyName))),
- in
- UpdatedRecords
Advertisement
Add Comment
Please, Sign In to add comment