Advertisement
Guest User

pg_bulkload MERGE

a guest
Jan 25th, 2012
229
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 4.42 KB | None | 0 0
  1. diff -r 3f065ec72ab8 pgbulkload/lib/pg_btree.c
  2. --- a/pgbulkload/lib/pg_btree.c Fri Jan 20 09:26:20 2012 -0600
  3. +++ b/pgbulkload/lib/pg_btree.c Wed Jan 25 13:37:43 2012 -0600
  4. @@ -398,6 +398,8 @@
  5.     BTReaderTerm(&reader);
  6.  }
  7.  
  8. +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src);
  9. +
  10.  /*
  11.   * _bt_mergeload - Merge two streams of index tuples into new index files.
  12.   */
  13. @@ -462,7 +464,6 @@
  14.                 }
  15.                 else
  16.                 {
  17. -                   // TODO -- BSJ
  18.                     if (on_duplicate == ON_DUPLICATE_KEEP_NEW)
  19.                     {
  20.                         self->dup_old++;
  21. @@ -470,7 +471,21 @@
  22.                             RelationGetRelationName(wstate->index));
  23.                         itup2 = BTReaderGetNextItem(btspool2);
  24.                     }
  25. -                   else
  26. +                   else if (on_duplicate == ON_DUPLICATE_MERGE)
  27. +                   {
  28. +                       self->dup_old++;
  29. +
  30. +                       // merge from itup into itup2 where itup's col[i] is not null
  31. +                       // but itup2's col[i] IS null
  32. +                       merge_tuples(heapRel, itup2, itup);            
  33. +
  34. +                       ItemPointerCopy(&t_tid2, &itup2->t_tid);
  35. +                       self->dup_new++;
  36. +                       remove_duplicate(self, heapRel, itup,
  37. +                           RelationGetRelationName(wstate->index));
  38. +                       itup = BTSpoolGetNextItem(btspool, itup, &should_free);
  39. +                   }
  40. +                   else
  41.                     {
  42.                         ItemPointerCopy(&t_tid2, &itup2->t_tid);
  43.                         self->dup_new++;
  44. @@ -950,6 +965,113 @@
  45.         self->dup_old + self->dup_new, relname);
  46.  }
  47.  
  48. +// returns Buffer after locking it (BUFFER_LOCK_SHARE then BUFFER_LOCK_UNLOCK)
  49. +Buffer load_buffer(Relation heap, IndexTuple itup, HeapTupleData *tuple /*OUT */, ItemId *itemid /*OUT */ )
  50. +{
  51. +   BlockNumber     blknum;
  52. +   BlockNumber     offnum;
  53. +   Buffer          buffer;
  54. +   Page            page;
  55. +
  56. +   blknum = ItemPointerGetBlockNumber(&itup->t_tid);
  57. +   offnum = ItemPointerGetOffsetNumber(&itup->t_tid);
  58. +   buffer = ReadBuffer(heap, blknum);
  59. +
  60. +   LockBuffer(buffer, BUFFER_LOCK_SHARE);
  61. +   page = BufferGetPage(buffer);
  62. +   *itemid = PageGetItemId(page, offnum);
  63. +   tuple->t_data = ItemIdIsNormal(*itemid)
  64. +       ? (HeapTupleHeader) PageGetItem(page, *itemid)
  65. +       : NULL;
  66. +   LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  67. +   return buffer;
  68. +}
  69. +
  70. +void load_tuple(Relation       heap,
  71. +               HeapTuple       tuple,
  72. +               IndexTuple      itup,
  73. +               ItemId          itemid,
  74. +               TupleDesc *     tupdesc     /* OUT */,             
  75. +               int *           ncolumns    /* OUT */,
  76. +               Datum **        values      /* OUT */,
  77. +               bool **         nulls       /* OUT */)
  78. +{
  79. +   *tupdesc = RelationGetDescr(heap);
  80. +
  81. +   tuple->t_len = ItemIdGetLength(itemid);
  82. +   tuple->t_self = itup->t_tid;
  83. +
  84. +   *ncolumns = (*tupdesc)->natts;
  85. +   *values = (Datum *) palloc(*ncolumns * sizeof(Datum));
  86. +   *nulls = (bool *) palloc(*ncolumns * sizeof(bool));
  87. +
  88. +   /* Break down the tuple into fields */
  89. +   heap_deform_tuple(tuple, *tupdesc, *values, *nulls);
  90. +}
  91. +
  92. +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src)
  93. +{
  94. +   HeapTupleData   tuple_src;
  95. +   HeapTupleData   tuple_dst;
  96. +   Buffer          buffer_src;
  97. +   Buffer          buffer_dst;
  98. +   ItemId          itemid_src;
  99. +   ItemId          itemid_dst;
  100. +
  101. +   // load buffers
  102. +   buffer_src = load_buffer(heap, itup_src, &tuple_src, &itemid_src);
  103. +   buffer_dst = load_buffer(heap, itup_dst, &tuple_dst, &itemid_dst);
  104. +
  105. +   if (tuple_src.t_data != NULL)
  106. +   {
  107. +       int         ncolumns_src, ncolumns_dst;
  108. +       int         i;
  109. +       Datum       *values_src = NULL, *values_dst = NULL;
  110. +       TupleDesc   tupdesc_dst, tupdesc_src;
  111. +       bool        *nulls_src = NULL, *nulls_dst = NULL;              
  112. +       bool        * do_replace = NULL;
  113. +       bool        tuple_updated = false;
  114. +      
  115. +       // load source
  116. +       load_tuple(heap, &tuple_src, itup_src, itemid_src, &tupdesc_src, &ncolumns_src, &values_src, &nulls_src);
  117. +      
  118. +       // load destination
  119. +       load_tuple(heap, &tuple_dst, itup_dst, itemid_dst, &tupdesc_dst, &ncolumns_dst, &values_dst, &nulls_dst);
  120. +
  121. +       do_replace = (bool *) palloc(ncolumns_dst * sizeof(bool));
  122. +
  123. +       for (i = 0; i < ncolumns_dst && i < ncolumns_src; ++i)
  124. +       {      
  125. +           do_replace[i] = false;
  126. +
  127. +           if (nulls_dst[i] && !nulls_src[i])
  128. +           {
  129. +               values_dst[i] = values_src[i];
  130. +               nulls_dst[i] = nulls_src[i];
  131. +               do_replace[i] = true;
  132. +
  133. +               // update new row
  134. +               tuple_updated = true;
  135. +           }
  136. +       }
  137. +      
  138. +       if (tuple_updated)
  139. +       {
  140. +           HeapTuple new_tuple = heap_modify_tuple(&tuple_dst, tupdesc_dst, values_dst, nulls_dst, do_replace);           
  141. +           simple_heap_update(heap, &(tuple_dst.t_self), new_tuple);
  142. +       }
  143. +
  144. +       pfree(do_replace);
  145. +       pfree(values_src);
  146. +       pfree(nulls_src);
  147. +       pfree(values_dst);
  148. +       pfree(nulls_dst);
  149. +   }
  150. +
  151. +   ReleaseBuffer(buffer_src);
  152. +   ReleaseBuffer(buffer_dst);
  153. +}
  154. +
  155.  char *
  156.  tuple_to_cstring(TupleDesc tupdesc, HeapTuple tuple)
  157.  {
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement