Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff -r 3f065ec72ab8 pgbulkload/lib/pg_btree.c
- --- a/pgbulkload/lib/pg_btree.c Fri Jan 20 09:26:20 2012 -0600
- +++ b/pgbulkload/lib/pg_btree.c Wed Jan 25 13:37:43 2012 -0600
- @@ -398,6 +398,8 @@
- BTReaderTerm(&reader);
- }
- +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src);
- +
- /*
- * _bt_mergeload - Merge two streams of index tuples into new index files.
- */
- @@ -462,7 +464,6 @@
- }
- else
- {
- - // TODO -- BSJ
- if (on_duplicate == ON_DUPLICATE_KEEP_NEW)
- {
- self->dup_old++;
- @@ -470,7 +471,21 @@
- RelationGetRelationName(wstate->index));
- itup2 = BTReaderGetNextItem(btspool2);
- }
- - else
- + else if (on_duplicate == ON_DUPLICATE_MERGE)
- + {
- + self->dup_old++;
- +
- + // merge from itup into itup2 where itup's col[i] is not null
- + // but itup2's col[i] IS null
- + merge_tuples(heapRel, itup2, itup);
- +
- + ItemPointerCopy(&t_tid2, &itup2->t_tid);
- + self->dup_new++;
- + remove_duplicate(self, heapRel, itup,
- + RelationGetRelationName(wstate->index));
- + itup = BTSpoolGetNextItem(btspool, itup, &should_free);
- + }
- + else
- {
- ItemPointerCopy(&t_tid2, &itup2->t_tid);
- self->dup_new++;
- @@ -950,6 +965,113 @@
- self->dup_old + self->dup_new, relname);
- }
- +// returns Buffer after locking it (BUFFER_LOCK_SHARE then BUFFER_LOCK_UNLOCK)
- +Buffer load_buffer(Relation heap, IndexTuple itup, HeapTupleData *tuple /*OUT */, ItemId *itemid /*OUT */ )
- +{
- + BlockNumber blknum;
- + BlockNumber offnum;
- + Buffer buffer;
- + Page page;
- +
- + blknum = ItemPointerGetBlockNumber(&itup->t_tid);
- + offnum = ItemPointerGetOffsetNumber(&itup->t_tid);
- + buffer = ReadBuffer(heap, blknum);
- +
- + LockBuffer(buffer, BUFFER_LOCK_SHARE);
- + page = BufferGetPage(buffer);
- + *itemid = PageGetItemId(page, offnum);
- + tuple->t_data = ItemIdIsNormal(*itemid)
- + ? (HeapTupleHeader) PageGetItem(page, *itemid)
- + : NULL;
- + LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- + return buffer;
- +}
- +
- +void load_tuple(Relation heap,
- + HeapTuple tuple,
- + IndexTuple itup,
- + ItemId itemid,
- + TupleDesc * tupdesc /* OUT */,
- + int * ncolumns /* OUT */,
- + Datum ** values /* OUT */,
- + bool ** nulls /* OUT */)
- +{
- + *tupdesc = RelationGetDescr(heap);
- +
- + tuple->t_len = ItemIdGetLength(itemid);
- + tuple->t_self = itup->t_tid;
- +
- + *ncolumns = (*tupdesc)->natts;
- + *values = (Datum *) palloc(*ncolumns * sizeof(Datum));
- + *nulls = (bool *) palloc(*ncolumns * sizeof(bool));
- +
- + /* Break down the tuple into fields */
- + heap_deform_tuple(tuple, *tupdesc, *values, *nulls);
- +}
- +
- +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src)
- +{
- + HeapTupleData tuple_src;
- + HeapTupleData tuple_dst;
- + Buffer buffer_src;
- + Buffer buffer_dst;
- + ItemId itemid_src;
- + ItemId itemid_dst;
- +
- + // load buffers
- + buffer_src = load_buffer(heap, itup_src, &tuple_src, &itemid_src);
- + buffer_dst = load_buffer(heap, itup_dst, &tuple_dst, &itemid_dst);
- +
- + if (tuple_src.t_data != NULL)
- + {
- + int ncolumns_src, ncolumns_dst;
- + int i;
- + Datum *values_src = NULL, *values_dst = NULL;
- + TupleDesc tupdesc_dst, tupdesc_src;
- + bool *nulls_src = NULL, *nulls_dst = NULL;
- + bool * do_replace = NULL;
- + bool tuple_updated = false;
- +
- + // load source
- + load_tuple(heap, &tuple_src, itup_src, itemid_src, &tupdesc_src, &ncolumns_src, &values_src, &nulls_src);
- +
- + // load destination
- + load_tuple(heap, &tuple_dst, itup_dst, itemid_dst, &tupdesc_dst, &ncolumns_dst, &values_dst, &nulls_dst);
- +
- + do_replace = (bool *) palloc(ncolumns_dst * sizeof(bool));
- +
- + for (i = 0; i < ncolumns_dst && i < ncolumns_src; ++i)
- + {
- + do_replace[i] = false;
- +
- + if (nulls_dst[i] && !nulls_src[i])
- + {
- + values_dst[i] = values_src[i];
- + nulls_dst[i] = nulls_src[i];
- + do_replace[i] = true;
- +
- + // update new row
- + tuple_updated = true;
- + }
- + }
- +
- + if (tuple_updated)
- + {
- + HeapTuple new_tuple = heap_modify_tuple(&tuple_dst, tupdesc_dst, values_dst, nulls_dst, do_replace);
- + simple_heap_update(heap, &(tuple_dst.t_self), new_tuple);
- + }
- +
- + pfree(do_replace);
- + pfree(values_src);
- + pfree(nulls_src);
- + pfree(values_dst);
- + pfree(nulls_dst);
- + }
- +
- + ReleaseBuffer(buffer_src);
- + ReleaseBuffer(buffer_dst);
- +}
- +
- char *
- tuple_to_cstring(TupleDesc tupdesc, HeapTuple tuple)
- {
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement