/[resiprocate]/main/contrib/db/btree/bt_cursor.c
ViewVC logotype

Contents of /main/contrib/db/btree/bt_cursor.c

Parent Directory Parent Directory | Revision Log Revision Log


Revision 5295 - (show annotations) (download)
Mon Aug 22 00:30:05 2005 UTC (14 years, 3 months ago) by jason
File MIME type: text/plain
File size: 75019 byte(s)
merged 5270:HEAD from b-directory-reorg
1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2004
5 * Sleepycat Software. All rights reserved.
6 *
7 * $Id: bt_cursor.c,v 11.190 2004/09/22 21:46:32 ubell Exp $
8 */
9
10 #include "db_config.h"
11
12 #ifndef NO_SYSTEM_INCLUDES
13 #include <sys/types.h>
14
15 #include <string.h>
16 #endif
17
18 #include "db_int.h"
19 #include "dbinc/db_page.h"
20 #include "dbinc/db_shash.h"
21 #include "dbinc/btree.h"
22 #include "dbinc/lock.h"
23 #include "dbinc/mp.h"
24
25 static int __bam_bulk __P((DBC *, DBT *, u_int32_t));
26 static int __bam_c_close __P((DBC *, db_pgno_t, int *));
27 static int __bam_c_del __P((DBC *));
28 static int __bam_c_destroy __P((DBC *));
29 static int __bam_c_first __P((DBC *));
30 static int __bam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
31 static int __bam_c_getstack __P((DBC *));
32 static int __bam_c_last __P((DBC *));
33 static int __bam_c_next __P((DBC *, int, int));
34 static int __bam_c_physdel __P((DBC *));
35 static int __bam_c_prev __P((DBC *));
36 static int __bam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
37 static int __bam_c_search __P((DBC *,
38 db_pgno_t, const DBT *, u_int32_t, int *));
39 static int __bam_c_writelock __P((DBC *));
40 static int __bam_getboth_finddatum __P((DBC *, DBT *, u_int32_t));
41 static int __bam_getbothc __P((DBC *, DBT *));
42 static int __bam_get_prev __P((DBC *));
43 static int __bam_isopd __P((DBC *, db_pgno_t *));
44
45 /*
46 * Acquire a new page/lock. If we hold a page/lock, discard the page, and
47 * lock-couple the lock.
48 *
49 * !!!
50 * We have to handle both where we have a lock to lock-couple and where we
51 * don't -- we don't duplicate locks when we duplicate cursors if we are
52 * running in a transaction environment as there's no point if locks are
53 * never discarded. This means that the cursor may or may not hold a lock.
54 * In the case where we are descending the tree we always want to unlock
55 * the held interior page so we use ACQUIRE_COUPLE.
56 */
57 #undef ACQUIRE
58 #define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, ret) do { \
59 DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \
60 if ((pagep) != NULL) { \
61 ret = __memp_fput(__mpf, pagep, 0); \
62 pagep = NULL; \
63 } else \
64 ret = 0; \
65 if ((ret) == 0 && STD_LOCKING(dbc)) \
66 ret = __db_lget(dbc, LCK_COUPLE, lpgno, mode, 0, &(lock));\
67 if ((ret) == 0) \
68 ret = __memp_fget(__mpf, &(fpgno), 0, &(pagep)); \
69 } while (0)
70
71 #undef ACQUIRE_COUPLE
72 #define ACQUIRE_COUPLE(dbc, mode, lpgno, lock, fpgno, pagep, ret) do { \
73 DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \
74 if ((pagep) != NULL) { \
75 ret = __memp_fput(__mpf, pagep, 0); \
76 pagep = NULL; \
77 } else \
78 ret = 0; \
79 if ((ret) == 0 && STD_LOCKING(dbc)) \
80 ret = __db_lget(dbc, \
81 LCK_COUPLE_ALWAYS, lpgno, mode, 0, &(lock)); \
82 if ((ret) == 0) \
83 ret = __memp_fget(__mpf, &(fpgno), 0, &(pagep)); \
84 } while (0)
85
86 /* Acquire a new page/lock for a cursor. */
87 #undef ACQUIRE_CUR
88 #define ACQUIRE_CUR(dbc, mode, p, ret) do { \
89 BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \
90 if (p != __cp->pgno) \
91 __cp->pgno = PGNO_INVALID; \
92 ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, ret); \
93 if ((ret) == 0) { \
94 __cp->pgno = p; \
95 __cp->lock_mode = (mode); \
96 } \
97 } while (0)
98
99 /*
100 * Acquire a new page/lock for a cursor and release the previous.
101 * This is typically used when descending a tree and we do not
102 * want to hold the interior nodes locked.
103 */
104 #undef ACQUIRE_CUR_COUPLE
105 #define ACQUIRE_CUR_COUPLE(dbc, mode, p, ret) do { \
106 BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \
107 if (p != __cp->pgno) \
108 __cp->pgno = PGNO_INVALID; \
109 ACQUIRE_COUPLE(dbc, mode, p, __cp->lock, p, __cp->page, ret); \
110 if ((ret) == 0) { \
111 __cp->pgno = p; \
112 __cp->lock_mode = (mode); \
113 } \
114 } while (0)
115
116 /*
117 * Acquire a write lock if we don't already have one.
118 *
119 * !!!
120 * See ACQUIRE macro on why we handle cursors that don't have locks.
121 */
122 #undef ACQUIRE_WRITE_LOCK
123 #define ACQUIRE_WRITE_LOCK(dbc, ret) do { \
124 BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \
125 ret = 0; \
126 if (STD_LOCKING(dbc) && \
127 __cp->lock_mode != DB_LOCK_WRITE && \
128 ((ret) = __db_lget(dbc, \
129 LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0, \
130 __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0) \
131 __cp->lock_mode = DB_LOCK_WRITE; \
132 } while (0)
133
134 /* Discard the current page/lock for a cursor. */
135 #undef DISCARD_CUR
136 #define DISCARD_CUR(dbc, ret) do { \
137 BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \
138 DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \
139 int __t_ret; \
140 if ((__cp->page) != NULL) { \
141 __t_ret = __memp_fput(__mpf, __cp->page, 0); \
142 __cp->page = NULL; \
143 } else \
144 __t_ret = 0; \
145 if (__t_ret != 0 && (ret) == 0) \
146 ret = __t_ret; \
147 __t_ret = __TLPUT((dbc), __cp->lock); \
148 if (__t_ret != 0 && (ret) == 0) \
149 ret = __t_ret; \
150 if ((ret) == 0 && !LOCK_ISSET(__cp->lock)) \
151 __cp->lock_mode = DB_LOCK_NG; \
152 } while (0)
153
154 /* If on-page item is a deleted record. */
155 #undef IS_DELETED
156 #define IS_DELETED(dbp, page, indx) \
157 B_DISSET(GET_BKEYDATA(dbp, page, \
158 (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
159 #undef IS_CUR_DELETED
160 #define IS_CUR_DELETED(dbc) \
161 IS_DELETED((dbc)->dbp, (dbc)->internal->page, (dbc)->internal->indx)
162
163 /*
164 * Test to see if two cursors could point to duplicates of the same key.
165 * In the case of off-page duplicates they are they same, as the cursors
166 * will be in the same off-page duplicate tree. In the case of on-page
167 * duplicates, the key index offsets must be the same. For the last test,
168 * as the original cursor may not have a valid page pointer, we use the
169 * current cursor's.
170 */
171 #undef IS_DUPLICATE
172 #define IS_DUPLICATE(dbc, i1, i2) \
173 (P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i1] == \
174 P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i2])
175 #undef IS_CUR_DUPLICATE
176 #define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx) \
177 (F_ISSET(dbc, DBC_OPD) || \
178 (orig_pgno == (dbc)->internal->pgno && \
179 IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
180
181 /*
182 * __bam_c_init --
183 * Initialize the access private portion of a cursor
184 *
185 * PUBLIC: int __bam_c_init __P((DBC *, DBTYPE));
186 */
187 int
188 __bam_c_init(dbc, dbtype)
189 DBC *dbc;
190 DBTYPE dbtype;
191 {
192 DB_ENV *dbenv;
193 int ret;
194
195 dbenv = dbc->dbp->dbenv;
196
197 /* Allocate/initialize the internal structure. */
198 if (dbc->internal == NULL && (ret =
199 __os_malloc(dbenv, sizeof(BTREE_CURSOR), &dbc->internal)) != 0)
200 return (ret);
201
202 /* Initialize methods. */
203 dbc->c_close = __db_c_close;
204 dbc->c_count = __db_c_count_pp;
205 dbc->c_del = __db_c_del_pp;
206 dbc->c_dup = __db_c_dup_pp;
207 dbc->c_get = __db_c_get_pp;
208 dbc->c_pget = __db_c_pget_pp;
209 dbc->c_put = __db_c_put_pp;
210 if (dbtype == DB_BTREE) {
211 dbc->c_am_bulk = __bam_bulk;
212 dbc->c_am_close = __bam_c_close;
213 dbc->c_am_del = __bam_c_del;
214 dbc->c_am_destroy = __bam_c_destroy;
215 dbc->c_am_get = __bam_c_get;
216 dbc->c_am_put = __bam_c_put;
217 dbc->c_am_writelock = __bam_c_writelock;
218 } else {
219 dbc->c_am_bulk = __bam_bulk;
220 dbc->c_am_close = __bam_c_close;
221 dbc->c_am_del = __ram_c_del;
222 dbc->c_am_destroy = __bam_c_destroy;
223 dbc->c_am_get = __ram_c_get;
224 dbc->c_am_put = __ram_c_put;
225 dbc->c_am_writelock = __bam_c_writelock;
226 }
227
228 return (0);
229 }
230
231 /*
232 * __bam_c_refresh
233 * Set things up properly for cursor re-use.
234 *
235 * PUBLIC: int __bam_c_refresh __P((DBC *));
236 */
237 int
238 __bam_c_refresh(dbc)
239 DBC *dbc;
240 {
241 BTREE *t;
242 BTREE_CURSOR *cp;
243 DB *dbp;
244
245 dbp = dbc->dbp;
246 t = dbp->bt_internal;
247 cp = (BTREE_CURSOR *)dbc->internal;
248
249 /*
250 * If our caller set the root page number, it's because the root was
251 * known. This is always the case for off page dup cursors. Else,
252 * pull it out of our internal information.
253 */
254 if (cp->root == PGNO_INVALID)
255 cp->root = t->bt_root;
256
257 LOCK_INIT(cp->lock);
258 cp->lock_mode = DB_LOCK_NG;
259
260 cp->sp = cp->csp = cp->stack;
261 cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
262
263 /*
264 * The btree leaf page data structures require that two key/data pairs
265 * (or four items) fit on a page, but other than that there's no fixed
266 * requirement. The btree off-page duplicates only require two items,
267 * to be exact, but requiring four for them as well seems reasonable.
268 *
269 * Recno uses the btree bt_ovflsize value -- it's close enough.
270 */
271 cp->ovflsize = B_MINKEY_TO_OVFLSIZE(
272 dbp, F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey, dbp->pgsize);
273
274 cp->recno = RECNO_OOB;
275 cp->order = INVALID_ORDER;
276 cp->flags = 0;
277
278 /* Initialize for record numbers. */
279 if (F_ISSET(dbc, DBC_OPD) ||
280 dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_AM_RECNUM)) {
281 F_SET(cp, C_RECNUM);
282
283 /*
284 * All btrees that support record numbers, optionally standard
285 * recno trees, and all off-page duplicate recno trees have
286 * mutable record numbers.
287 */
288 if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
289 F_ISSET(dbp, DB_AM_RECNUM | DB_AM_RENUMBER))
290 F_SET(cp, C_RENUMBER);
291 }
292
293 return (0);
294 }
295
296 /*
297 * __bam_c_close --
298 * Close down the cursor.
299 */
300 static int
301 __bam_c_close(dbc, root_pgno, rmroot)
302 DBC *dbc;
303 db_pgno_t root_pgno;
304 int *rmroot;
305 {
306 BTREE_CURSOR *cp, *cp_opd, *cp_c;
307 DB *dbp;
308 DBC *dbc_opd, *dbc_c;
309 DB_MPOOLFILE *mpf;
310 PAGE *h;
311 int cdb_lock, ret;
312
313 dbp = dbc->dbp;
314 mpf = dbp->mpf;
315 cp = (BTREE_CURSOR *)dbc->internal;
316 cp_opd = (dbc_opd = cp->opd) == NULL ?
317 NULL : (BTREE_CURSOR *)dbc_opd->internal;
318 cdb_lock = ret = 0;
319
320 /*
321 * There are 3 ways this function is called:
322 *
323 * 1. Closing a primary cursor: we get called with a pointer to a
324 * primary cursor that has a NULL opd field. This happens when
325 * closing a btree/recno database cursor without an associated
326 * off-page duplicate tree.
327 *
328 * 2. Closing a primary and an off-page duplicate cursor stack: we
329 * get called with a pointer to the primary cursor which has a
330 * non-NULL opd field. This happens when closing a btree cursor
331 * into database with an associated off-page btree/recno duplicate
332 * tree. (It can't be a primary recno database, recno databases
333 * don't support duplicates.)
334 *
335 * 3. Closing an off-page duplicate cursor stack: we get called with
336 * a pointer to the off-page duplicate cursor. This happens when
337 * closing a non-btree database that has an associated off-page
338 * btree/recno duplicate tree or for a btree database when the
339 * opd tree is not empty (root_pgno == PGNO_INVALID).
340 *
341 * If either the primary or off-page duplicate cursor deleted a btree
342 * key/data pair, check to see if the item is still referenced by a
343 * different cursor. If it is, confirm that cursor's delete flag is
344 * set and leave it to that cursor to do the delete.
345 *
346 * NB: The test for == 0 below is correct. Our caller already removed
347 * our cursor argument from the active queue, we won't find it when we
348 * search the queue in __bam_ca_delete().
349 * NB: It can't be true that both the primary and off-page duplicate
350 * cursors have deleted a btree key/data pair. Either the primary
351 * cursor may have deleted an item and there's no off-page duplicate
352 * cursor, or there's an off-page duplicate cursor and it may have
353 * deleted an item.
354 *
355 * Primary recno databases aren't an issue here. Recno keys are either
356 * deleted immediately or never deleted, and do not have to be handled
357 * here.
358 *
359 * Off-page duplicate recno databases are an issue here, cases #2 and
360 * #3 above can both be off-page recno databases. The problem is the
361 * same as the final problem for off-page duplicate btree databases.
362 * If we no longer need the off-page duplicate tree, we want to remove
363 * it. For off-page duplicate btrees, we are done with the tree when
364 * we delete the last item it contains, i.e., there can be no further
365 * references to it when it's empty. For off-page duplicate recnos,
366 * we remove items from the tree as the application calls the remove
367 * function, so we are done with the tree when we close the last cursor
368 * that references it.
369 *
370 * We optionally take the root page number from our caller. If the
371 * primary database is a btree, we can get it ourselves because dbc
372 * is the primary cursor. If the primary database is not a btree,
373 * the problem is that we may be dealing with a stack of pages. The
374 * cursor we're using to do the delete points at the bottom of that
375 * stack and we need the top of the stack.
376 */
377 if (F_ISSET(cp, C_DELETED)) {
378 dbc_c = dbc;
379 switch (dbc->dbtype) {
380 case DB_BTREE: /* Case #1, #3. */
381 if (__bam_ca_delete(dbp, cp->pgno, cp->indx, 1) == 0)
382 goto lock;
383 goto done;
384 case DB_RECNO:
385 if (!F_ISSET(dbc, DBC_OPD)) /* Case #1. */
386 goto done;
387 /* Case #3. */
388 if (__ram_ca_delete(dbp, cp->root) == 0)
389 goto lock;
390 goto done;
391 case DB_HASH:
392 case DB_QUEUE:
393 case DB_UNKNOWN:
394 default:
395 return (__db_unknown_type(dbp->dbenv,
396 "__bam_c_close", dbc->dbtype));
397 }
398 }
399
400 if (dbc_opd == NULL)
401 goto done;
402
403 if (F_ISSET(cp_opd, C_DELETED)) { /* Case #2. */
404 /*
405 * We will not have been provided a root page number. Acquire
406 * one from the primary database.
407 */
408 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &h)) != 0)
409 goto err;
410 root_pgno = GET_BOVERFLOW(dbp, h, cp->indx + O_INDX)->pgno;
411 if ((ret = __memp_fput(mpf, h, 0)) != 0)
412 goto err;
413
414 dbc_c = dbc_opd;
415 switch (dbc_opd->dbtype) {
416 case DB_BTREE:
417 if (__bam_ca_delete(
418 dbp, cp_opd->pgno, cp_opd->indx, 1) == 0)
419 goto lock;
420 goto done;
421 case DB_RECNO:
422 if (__ram_ca_delete(dbp, cp_opd->root) == 0)
423 goto lock;
424 goto done;
425 case DB_HASH:
426 case DB_QUEUE:
427 case DB_UNKNOWN:
428 default:
429 return (__db_unknown_type(
430 dbp->dbenv, "__bam_c_close", dbc->dbtype));
431 }
432 }
433 goto done;
434
435 lock: cp_c = (BTREE_CURSOR *)dbc_c->internal;
436
437 /*
438 * If this is CDB, upgrade the lock if necessary. While we acquired
439 * the write lock to logically delete the record, we released it when
440 * we returned from that call, and so may not be holding a write lock
441 * at the moment.
442 */
443 if (CDB_LOCKING(dbp->dbenv)) {
444 if (F_ISSET(dbc, DBC_WRITECURSOR)) {
445 if ((ret = __lock_get(dbp->dbenv,
446 dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt,
447 DB_LOCK_WRITE, &dbc->mylock)) != 0)
448 goto err;
449 cdb_lock = 1;
450 }
451 goto delete;
452 }
453
454 /*
455 * The variable dbc_c has been initialized to reference the cursor in
456 * which we're going to do the delete. Initialize the cursor's lock
457 * structures as necessary.
458 *
459 * First, we may not need to acquire any locks. If we're in case #3,
460 * that is, the primary database isn't a btree database, our caller
461 * is responsible for acquiring any necessary locks before calling us.
462 */
463 if (F_ISSET(dbc, DBC_OPD))
464 goto delete;
465
466 /*
467 * Otherwise, acquire a write lock on the primary database's page.
468 *
469 * Lock the primary database page, regardless of whether we're deleting
470 * an item on a primary database page or an off-page duplicates page.
471 *
472 * If the cursor that did the initial logical deletion (and had a write
473 * lock) is not the same cursor doing the physical deletion (which may
474 * have only ever had a read lock on the item), we need to upgrade to a
475 * write lock. The confusion comes as follows:
476 *
477 * C1 created, acquires item read lock
478 * C2 dup C1, create C2, also has item read lock.
479 * C1 acquire write lock, delete item
480 * C1 close
481 * C2 close, needs a write lock to physically delete item.
482 *
483 * If we're in a TXN, we know that C2 will be able to acquire the write
484 * lock, because no locker other than the one shared by C1 and C2 can
485 * acquire a write lock -- the original write lock C1 acquired was never
486 * discarded.
487 *
488 * If we're not in a TXN, it's nastier. Other cursors might acquire
489 * read locks on the item after C1 closed, discarding its write lock,
490 * and such locks would prevent C2 from acquiring a read lock. That's
491 * OK, though, we'll simply wait until we can acquire a write lock, or
492 * we'll deadlock. (Which better not happen, since we're not in a TXN.)
493 *
494 * There are similar scenarios with dirty reads, where the cursor may
495 * have downgraded its write lock to a was-write lock.
496 */
497 if (STD_LOCKING(dbc))
498 if ((ret = __db_lget(dbc,
499 LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
500 goto err;
501
502 delete: /*
503 * If the delete occurred in a Btree, we're going to look at the page
504 * to see if the item has to be physically deleted. Otherwise, we do
505 * not need the actual page (and it may not even exist, it might have
506 * been truncated from the file after an allocation aborted).
507 *
508 * Delete the on-page physical item referenced by the cursor.
509 */
510 if (dbc_c->dbtype == DB_BTREE) {
511 if ((ret = __memp_fget(mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
512 goto err;
513 if ((ret = __bam_c_physdel(dbc_c)) != 0)
514 goto err;
515 }
516
517 /*
518 * If we're not working in an off-page duplicate tree, then we're
519 * done.
520 */
521 if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
522 goto done;
523
524 /*
525 * We may have just deleted the last element in the off-page duplicate
526 * tree, and closed the last cursor in the tree. For an off-page btree
527 * there are no other cursors in the tree by definition, if the tree is
528 * empty. For an off-page recno we know we have closed the last cursor
529 * in the tree because the __ram_ca_delete call above returned 0 only
530 * in that case. So, if the off-page duplicate tree is empty at this
531 * point, we want to remove it.
532 */
533 if ((ret = __memp_fget(mpf, &root_pgno, 0, &h)) != 0)
534 goto err;
535 if (NUM_ENT(h) == 0) {
536 DISCARD_CUR(dbc_c, ret);
537 if (ret != 0)
538 goto err;
539 if ((ret = __db_free(dbc, h)) != 0)
540 goto err;
541 } else {
542 if ((ret = __memp_fput(mpf, h, 0)) != 0)
543 goto err;
544 goto done;
545 }
546
547 /*
548 * When removing the tree, we have to do one of two things. If this is
549 * case #2, that is, the primary tree is a btree, delete the key that's
550 * associated with the tree from the btree leaf page. We know we are
551 * the only reference to it and we already have the correct lock. We
552 * detect this case because the cursor that was passed to us references
553 * an off-page duplicate cursor.
554 *
555 * If this is case #3, that is, the primary tree isn't a btree, pass
556 * the information back to our caller, it's their job to do cleanup on
557 * the primary page.
558 */
559 if (dbc_opd != NULL) {
560 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
561 goto err;
562 if ((ret = __bam_c_physdel(dbc)) != 0)
563 goto err;
564 } else
565 *rmroot = 1;
566 err:
567 done: /*
568 * Discard the page references and locks, and confirm that the stack
569 * has been emptied.
570 */
571 if (dbc_opd != NULL)
572 DISCARD_CUR(dbc_opd, ret);
573 DISCARD_CUR(dbc, ret);
574
575 /* Downgrade any CDB lock we acquired. */
576 if (cdb_lock)
577 (void)__lock_downgrade(
578 dbp->dbenv, &dbc->mylock, DB_LOCK_IWRITE, 0);
579
580 return (ret);
581 }
582
583 /*
584 * __bam_c_destroy --
585 * Close a single cursor -- internal version.
586 */
587 static int
588 __bam_c_destroy(dbc)
589 DBC *dbc;
590 {
591 /* Discard the structures. */
592 __os_free(dbc->dbp->dbenv, dbc->internal);
593
594 return (0);
595 }
596
597 /*
598 * __bam_c_count --
599 * Return a count of on and off-page duplicates.
600 *
601 * PUBLIC: int __bam_c_count __P((DBC *, db_recno_t *));
602 */
603 int
604 __bam_c_count(dbc, recnop)
605 DBC *dbc;
606 db_recno_t *recnop;
607 {
608 BTREE_CURSOR *cp;
609 DB *dbp;
610 DB_MPOOLFILE *mpf;
611 db_indx_t indx, top;
612 db_recno_t recno;
613 int ret;
614
615 dbp = dbc->dbp;
616 mpf = dbp->mpf;
617 cp = (BTREE_CURSOR *)dbc->internal;
618
619 /*
620 * Called with the top-level cursor that may reference an off-page
621 * duplicates tree. We don't have to acquire any new locks, we have
622 * to have a read lock to even get here.
623 */
624 if (cp->opd == NULL) {
625 /*
626 * On-page duplicates, get the page and count.
627 */
628 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
629 return (ret);
630
631 /*
632 * Move back to the beginning of the set of duplicates and
633 * then count forward.
634 */
635 for (indx = cp->indx;; indx -= P_INDX)
636 if (indx == 0 ||
637 !IS_DUPLICATE(dbc, indx, indx - P_INDX))
638 break;
639 for (recno = 0,
640 top = NUM_ENT(cp->page) - P_INDX;; indx += P_INDX) {
641 if (!IS_DELETED(dbp, cp->page, indx))
642 ++recno;
643 if (indx == top ||
644 !IS_DUPLICATE(dbc, indx, indx + P_INDX))
645 break;
646 }
647 } else {
648 /*
649 * Off-page duplicates tree, get the root page of the off-page
650 * duplicate tree.
651 */
652 if ((ret = __memp_fget(
653 mpf, &cp->opd->internal->root, 0, &cp->page)) != 0)
654 return (ret);
655
656 /*
657 * If the page is an internal page use the page's count as it's
658 * up-to-date and reflects the status of cursors in the tree.
659 * If the page is a leaf page for unsorted duplicates, use the
660 * page's count as cursors don't mark items deleted on the page
661 * and wait, cursor delete items immediately.
662 * If the page is a leaf page for sorted duplicates, there may
663 * be cursors on the page marking deleted items -- count.
664 */
665 if (TYPE(cp->page) == P_LDUP)
666 for (recno = 0, indx = 0,
667 top = NUM_ENT(cp->page) - O_INDX;; indx += O_INDX) {
668 if (!IS_DELETED(dbp, cp->page, indx))
669 ++recno;
670 if (indx == top)
671 break;
672 }
673 else
674 recno = RE_NREC(cp->page);
675 }
676
677 *recnop = recno;
678
679 ret = __memp_fput(mpf, cp->page, 0);
680 cp->page = NULL;
681
682 return (ret);
683 }
684
685 /*
686 * __bam_c_del --
687 * Delete using a cursor.
688 */
689 static int
690 __bam_c_del(dbc)
691 DBC *dbc;
692 {
693 BTREE_CURSOR *cp;
694 DB *dbp;
695 DB_MPOOLFILE *mpf;
696 int ret, t_ret;
697
698 dbp = dbc->dbp;
699 mpf = dbp->mpf;
700 cp = (BTREE_CURSOR *)dbc->internal;
701 ret = 0;
702
703 /* If the item was already deleted, return failure. */
704 if (F_ISSET(cp, C_DELETED))
705 return (DB_KEYEMPTY);
706
707 /*
708 * This code is always called with a page lock but no page.
709 */
710 DB_ASSERT(cp->page == NULL);
711
712 /*
713 * We don't physically delete the record until the cursor moves, so
714 * we have to have a long-lived write lock on the page instead of a
715 * a long-lived read lock. Note, we have to have a read lock to even
716 * get here.
717 *
718 * If we're maintaining record numbers, we lock the entire tree, else
719 * we lock the single page.
720 */
721 if (F_ISSET(cp, C_RECNUM)) {
722 if ((ret = __bam_c_getstack(dbc)) != 0)
723 goto err;
724 cp->page = cp->csp->page;
725 } else {
726 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, cp->pgno, ret);
727 if (ret != 0)
728 goto err;
729 }
730
731 /* Log the change. */
732 if (DBC_LOGGING(dbc)) {
733 if ((ret = __bam_cdel_log(dbp, dbc->txn, &LSN(cp->page), 0,
734 PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
735 goto err;
736 } else
737 LSN_NOT_LOGGED(LSN(cp->page));
738
739 /* Set the intent-to-delete flag on the page. */
740 if (TYPE(cp->page) == P_LBTREE)
741 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx + O_INDX)->type);
742 else
743 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type);
744
745 /* Mark the page dirty. */
746 ret = __memp_fset(mpf, cp->page, DB_MPOOL_DIRTY);
747
748 err: /*
749 * If we've been successful so far and the tree has record numbers,
750 * adjust the record counts. Either way, release acquired page(s).
751 */
752 if (F_ISSET(cp, C_RECNUM)) {
753 if (ret == 0)
754 ret = __bam_adjust(dbc, -1);
755 (void)__bam_stkrel(dbc, 0);
756 } else
757 if (cp->page != NULL &&
758 (t_ret = __memp_fput(mpf, cp->page, 0)) != 0 && ret == 0)
759 ret = t_ret;
760
761 cp->page = NULL;
762
763 /* Update the cursors last, after all chance of failure is past. */
764 if (ret == 0)
765 (void)__bam_ca_delete(dbp, cp->pgno, cp->indx, 1);
766
767 return (ret);
768 }
769
770 /*
771 * __bam_c_dup --
772 * Duplicate a btree cursor, such that the new one holds appropriate
773 * locks for the position of the original.
774 *
775 * PUBLIC: int __bam_c_dup __P((DBC *, DBC *));
776 */
777 int
778 __bam_c_dup(orig_dbc, new_dbc)
779 DBC *orig_dbc, *new_dbc;
780 {
781 BTREE_CURSOR *orig, *new;
782 int ret;
783
784 orig = (BTREE_CURSOR *)orig_dbc->internal;
785 new = (BTREE_CURSOR *)new_dbc->internal;
786
787 /*
788 * If we're holding a lock we need to acquire a copy of it, unless
789 * we're in a transaction. We don't need to copy any lock we're
790 * holding inside a transaction because all the locks are retained
791 * until the transaction commits or aborts.
792 */
793 if (orig_dbc->txn == NULL && LOCK_ISSET(orig->lock))
794 if ((ret = __db_lget(new_dbc,
795 0, new->pgno, new->lock_mode, 0, &new->lock)) != 0)
796 return (ret);
797
798 new->ovflsize = orig->ovflsize;
799 new->recno = orig->recno;
800 new->flags = orig->flags;
801
802 return (0);
803 }
804
805 /*
806 * __bam_c_get --
807 * Get using a cursor (btree).
808 */
809 static int
810 __bam_c_get(dbc, key, data, flags, pgnop)
811 DBC *dbc;
812 DBT *key, *data;
813 u_int32_t flags;
814 db_pgno_t *pgnop;
815 {
816 BTREE_CURSOR *cp;
817 DB *dbp;
818 DB_MPOOLFILE *mpf;
819 db_pgno_t orig_pgno;
820 db_indx_t orig_indx;
821 int exact, newopd, ret;
822
823 dbp = dbc->dbp;
824 mpf = dbp->mpf;
825 cp = (BTREE_CURSOR *)dbc->internal;
826 orig_pgno = cp->pgno;
827 orig_indx = cp->indx;
828
829 newopd = 0;
830 switch (flags) {
831 case DB_CURRENT:
832 /* It's not possible to return a deleted record. */
833 if (F_ISSET(cp, C_DELETED)) {
834 ret = DB_KEYEMPTY;
835 goto err;
836 }
837
838 /*
839 * Acquire the current page. We have at least a read-lock
840 * already. The caller may have set DB_RMW asking for a
841 * write lock, but upgrading to a write lock has no better
842 * chance of succeeding now instead of later, so don't try.
843 */
844 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
845 goto err;
846 break;
847 case DB_FIRST:
848 newopd = 1;
849 if ((ret = __bam_c_first(dbc)) != 0)
850 goto err;
851 break;
852 case DB_GET_BOTH:
853 case DB_GET_BOTH_RANGE:
854 /*
855 * There are two ways to get here based on DBcursor->c_get
856 * with the DB_GET_BOTH/DB_GET_BOTH_RANGE flags set:
857 *
858 * 1. Searching a sorted off-page duplicate tree: do a tree
859 * search.
860 *
861 * 2. Searching btree: do a tree search. If it returns a
862 * reference to off-page duplicate tree, return immediately
863 * and let our caller deal with it. If the search doesn't
864 * return a reference to off-page duplicate tree, continue
865 * with an on-page search.
866 */
867 if (F_ISSET(dbc, DBC_OPD)) {
868 if ((ret = __bam_c_search(
869 dbc, PGNO_INVALID, data, flags, &exact)) != 0)
870 goto err;
871 if (flags == DB_GET_BOTH) {
872 if (!exact) {
873 ret = DB_NOTFOUND;
874 goto err;
875 }
876 break;
877 }
878
879 /*
880 * We didn't require an exact match, so the search may
881 * may have returned an entry past the end of the page,
882 * or we may be referencing a deleted record. If so,
883 * move to the next entry.
884 */
885 if ((cp->indx == NUM_ENT(cp->page) ||
886 IS_CUR_DELETED(dbc)) &&
887 (ret = __bam_c_next(dbc, 1, 0)) != 0)
888 goto err;
889 } else {
890 if ((ret = __bam_c_search(
891 dbc, PGNO_INVALID, key, flags, &exact)) != 0)
892 return (ret);
893 if (!exact) {
894 ret = DB_NOTFOUND;
895 goto err;
896 }
897
898 if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
899 newopd = 1;
900 break;
901 }
902 if ((ret =
903 __bam_getboth_finddatum(dbc, data, flags)) != 0)
904 goto err;
905 }
906 break;
907 case DB_GET_BOTHC:
908 if ((ret = __bam_getbothc(dbc, data)) != 0)
909 goto err;
910 break;
911 case DB_LAST:
912 newopd = 1;
913 if ((ret = __bam_c_last(dbc)) != 0)
914 goto err;
915 break;
916 case DB_NEXT:
917 newopd = 1;
918 if (cp->pgno == PGNO_INVALID) {
919 if ((ret = __bam_c_first(dbc)) != 0)
920 goto err;
921 } else
922 if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
923 goto err;
924 break;
925 case DB_NEXT_DUP:
926 if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
927 goto err;
928 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
929 ret = DB_NOTFOUND;
930 goto err;
931 }
932 break;
933 case DB_NEXT_NODUP:
934 newopd = 1;
935 if (cp->pgno == PGNO_INVALID) {
936 if ((ret = __bam_c_first(dbc)) != 0)
937 goto err;
938 } else
939 do {
940 if ((ret = __bam_c_next(dbc, 1, 0)) != 0)
941 goto err;
942 } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
943 break;
944 case DB_PREV:
945 newopd = 1;
946 if (cp->pgno == PGNO_INVALID) {
947 if ((ret = __bam_c_last(dbc)) != 0)
948 goto err;
949 } else
950 if ((ret = __bam_c_prev(dbc)) != 0)
951 goto err;
952 break;
953 case DB_PREV_NODUP:
954 newopd = 1;
955 if (cp->pgno == PGNO_INVALID) {
956 if ((ret = __bam_c_last(dbc)) != 0)
957 goto err;
958 } else
959 do {
960 if ((ret = __bam_c_prev(dbc)) != 0)
961 goto err;
962 } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
963 break;
964 case DB_SET:
965 case DB_SET_RECNO:
966 newopd = 1;
967 if ((ret = __bam_c_search(dbc,
968 PGNO_INVALID, key, flags, &exact)) != 0)
969 goto err;
970 break;
971 case DB_SET_RANGE:
972 newopd = 1;
973 if ((ret = __bam_c_search(dbc,
974 PGNO_INVALID, key, flags, &exact)) != 0)
975 goto err;
976
977 /*
978 * As we didn't require an exact match, the search function
979 * may have returned an entry past the end of the page. Or,
980 * we may be referencing a deleted record. If so, move to
981 * the next entry.
982 */
983 if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
984 if ((ret = __bam_c_next(dbc, 0, 0)) != 0)
985 goto err;
986 break;
987 default:
988 ret = __db_unknown_flag(dbp->dbenv, "__bam_c_get", flags);
989 goto err;
990 }
991
992 /*
993 * We may have moved to an off-page duplicate tree. Return that
994 * information to our caller.
995 */
996 if (newopd && pgnop != NULL)
997 (void)__bam_isopd(dbc, pgnop);
998
999 /*
1000 * Don't return the key, it was passed to us (this is true even if the
1001 * application defines a compare function returning equality for more
1002 * than one key value, since in that case which actual value we store
1003 * in the database is undefined -- and particularly true in the case of
1004 * duplicates where we only store one key value).
1005 */
1006 if (flags == DB_GET_BOTH ||
1007 flags == DB_GET_BOTH_RANGE || flags == DB_SET)
1008 F_SET(key, DB_DBT_ISSET);
1009
1010 err: /*
1011 * Regardless of whether we were successful or not, if the cursor
1012 * moved, clear the delete flag, DBcursor->c_get never references
1013 * a deleted key, if it moved at all.
1014 */
1015 if (F_ISSET(cp, C_DELETED) &&
1016 (cp->pgno != orig_pgno || cp->indx != orig_indx))
1017 F_CLR(cp, C_DELETED);
1018
1019 return (ret);
1020 }
1021
1022 static int
1023 __bam_get_prev(dbc)
1024 DBC *dbc;
1025 {
1026 BTREE_CURSOR *cp;
1027 DBT key, data;
1028 db_pgno_t pgno;
1029 int ret;
1030
1031 if ((ret = __bam_c_prev(dbc)) != 0)
1032 return (ret);
1033
1034 if (__bam_isopd(dbc, &pgno)) {
1035 cp = (BTREE_CURSOR *)dbc->internal;
1036 if ((ret = __db_c_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1037 return (ret);
1038 if ((ret = cp->opd->c_am_get(cp->opd,
1039 &key, &data, DB_LAST, NULL)) != 0)
1040 return (ret);
1041 }
1042
1043 return (0);
1044 }
1045
1046 /*
1047 * __bam_bulk -- Return bulk data from a btree.
1048 */
1049 static int
1050 __bam_bulk(dbc, data, flags)
1051 DBC *dbc;
1052 DBT *data;
1053 u_int32_t flags;
1054 {
1055 BKEYDATA *bk;
1056 BOVERFLOW *bo;
1057 BTREE_CURSOR *cp;
1058 PAGE *pg;
1059 db_indx_t *inp, indx, pg_keyoff;
1060 int32_t *endp, key_off, *offp, *saveoffp;
1061 u_int8_t *dbuf, *dp, *np;
1062 u_int32_t key_size, pagesize, size, space;
1063 int adj, is_key, need_pg, next_key, no_dup, rec_key, ret;
1064
1065 ret = 0;
1066 key_off = 0;
1067 size = 0;
1068 pagesize = dbc->dbp->pgsize;
1069 cp = (BTREE_CURSOR *)dbc->internal;
1070
1071 /*
1072 * dp tracks the beginning of the page in the buffer.
1073 * np is the next place to copy things into the buffer.
1074 * dbuf always stays at the beginning of the buffer.
1075 */
1076 dbuf = data->data;
1077 np = dp = dbuf;
1078
1079 /* Keep track of space that is left. There is a termination entry */
1080 space = data->ulen;
1081 space -= sizeof(*offp);
1082
1083 /* Build the offset/size table from the end up. */
1084 endp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
1085 endp--;
1086 offp = endp;
1087
1088 key_size = 0;
1089
1090 /*
1091 * Distinguish between BTREE and RECNO.
1092 * There are no keys in RECNO. If MULTIPLE_KEY is specified
1093 * then we return the record numbers.
1094 * is_key indicates that multiple btree keys are returned.
1095 * rec_key is set if we are returning record numbers.
1096 * next_key is set if we are going after the next key rather than dup.
1097 */
1098 if (dbc->dbtype == DB_BTREE) {
1099 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1: 0;
1100 rec_key = 0;
1101 next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1102 adj = 2;
1103 } else {
1104 is_key = 0;
1105 rec_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1106 next_key = LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1107 adj = 1;
1108 }
1109 no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
1110
1111 next_pg:
1112 indx = cp->indx;
1113 pg = cp->page;
1114
1115 inp = P_INP(dbc->dbp, pg);
1116 /* The current page is not yet in the buffer. */
1117 need_pg = 1;
1118
1119 /*
1120 * Keep track of the offset of the current key on the page.
1121 * If we are returning keys, set it to 0 first so we force
1122 * the copy of the key to the buffer.
1123 */
1124 pg_keyoff = 0;
1125 if (is_key == 0)
1126 pg_keyoff = inp[indx];
1127
1128 do {
1129 if (IS_DELETED(dbc->dbp, pg, indx)) {
1130 if (dbc->dbtype != DB_RECNO)
1131 continue;
1132
1133 cp->recno++;
1134 /*
1135 * If we are not returning recnos then we
1136 * need to fill in every slot so the user
1137 * can calculate the record numbers.
1138 */
1139 if (rec_key != 0)
1140 continue;
1141
1142 space -= 2 * sizeof(*offp);
1143 /* Check if space as underflowed. */
1144 if (space > data->ulen)
1145 goto back_up;
1146
1147 /* Just mark the empty recno slots. */
1148 *offp-- = 0;
1149 *offp-- = 0;
1150 continue;
1151 }
1152
1153 /*
1154 * Check to see if we have a new key.
1155 * If so, then see if we need to put the
1156 * key on the page. If its already there
1157 * then we just point to it.
1158 */
1159 if (is_key && pg_keyoff != inp[indx]) {
1160 bk = GET_BKEYDATA(dbc->dbp, pg, indx);
1161 if (B_TYPE(bk->type) == B_OVERFLOW) {
1162 bo = (BOVERFLOW *)bk;
1163 size = key_size = bo->tlen;
1164 if (key_size > space)
1165 goto get_key_space;
1166 if ((ret = __bam_bulk_overflow(dbc,
1167 bo->tlen, bo->pgno, np)) != 0)
1168 return (ret);
1169 space -= key_size;
1170 key_off = (int32_t)(np - dbuf);
1171 np += key_size;
1172 } else {
1173 if (need_pg) {
1174 dp = np;
1175 size = pagesize - HOFFSET(pg);
1176 if (space < size) {
1177 get_key_space:
1178 /* Nothing added, then error. */
1179 if (offp == endp) {
1180 data->size = (u_int32_t)
1181 DB_ALIGN(size +
1182 pagesize, 1024);
1183 return
1184 (DB_BUFFER_SMALL);
1185 }
1186 /*
1187 * We need to back up to the
1188 * last record put into the
1189 * buffer so that it is
1190 * CURRENT.
1191 */
1192 if (indx != 0)
1193 indx -= P_INDX;
1194 else {
1195 if ((ret =
1196 __bam_get_prev(
1197 dbc)) != 0)
1198 return (ret);
1199 indx = cp->indx;
1200 pg = cp->page;
1201 }
1202 break;
1203 }
1204 /*
1205 * Move the data part of the page
1206 * to the buffer.
1207 */
1208 memcpy(dp,
1209 (u_int8_t *)pg + HOFFSET(pg), size);
1210 need_pg = 0;
1211 space -= size;
1212 np += size;
1213 }
1214 key_size = bk->len;
1215 key_off = (int32_t)((inp[indx] - HOFFSET(pg))
1216 + (dp - dbuf) + SSZA(BKEYDATA, data));
1217 pg_keyoff = inp[indx];
1218 }
1219 }
1220
1221 /*
1222 * Reserve space for the pointers and sizes.
1223 * Either key/data pair or just for a data item.
1224 */
1225 space -= (is_key ? 4 : 2) * sizeof(*offp);
1226 if (rec_key)
1227 space -= sizeof(*offp);
1228
1229 /* Check to see if space has underflowed. */
1230 if (space > data->ulen)
1231 goto back_up;
1232
1233 /*
1234 * Determine if the next record is in the
1235 * buffer already or if it needs to be copied in.
1236 * If we have an off page dup, then copy as many
1237 * as will fit into the buffer.
1238 */
1239 bk = GET_BKEYDATA(dbc->dbp, pg, indx + adj - 1);
1240 if (B_TYPE(bk->type) == B_DUPLICATE) {
1241 bo = (BOVERFLOW *)bk;
1242 if (is_key) {
1243 *offp-- = (int32_t)key_off;
1244 *offp-- = (int32_t)key_size;
1245 }
1246 /*
1247 * We pass the offset of the current key.
1248 * On return we check to see if offp has
1249 * moved to see if any data fit.
1250 */
1251 saveoffp = offp;
1252 if ((ret = __bam_bulk_duplicates(dbc, bo->pgno,
1253 dbuf, is_key ? offp + P_INDX : NULL,
1254 &offp, &np, &space, no_dup)) != 0) {
1255 if (ret == DB_BUFFER_SMALL) {
1256 size = space;
1257 space = 0;
1258 /* If nothing was added, then error. */
1259 if (offp == saveoffp) {
1260 offp += 2;
1261 goto back_up;
1262 }
1263 goto get_space;
1264 }
1265 return (ret);
1266 }
1267 } else if (B_TYPE(bk->type) == B_OVERFLOW) {
1268 bo = (BOVERFLOW *)bk;
1269 size = bo->tlen;
1270 if (size > space)
1271 goto back_up;
1272 if ((ret =
1273 __bam_bulk_overflow(dbc,
1274 bo->tlen, bo->pgno, np)) != 0)
1275 return (ret);
1276 space -= size;
1277 if (is_key) {
1278 *offp-- = (int32_t)key_off;
1279 *offp-- = (int32_t)key_size;
1280 } else if (rec_key)
1281 *offp-- = (int32_t)cp->recno;
1282 *offp-- = (int32_t)(np - dbuf);
1283 np += size;
1284 *offp-- = (int32_t)size;
1285 } else {
1286 if (need_pg) {
1287 dp = np;
1288 size = pagesize - HOFFSET(pg);
1289 if (space < size) {
1290 back_up:
1291 /*
1292 * Back up the index so that the
1293 * last record in the buffer is CURRENT
1294 */
1295 if (indx >= adj)
1296 indx -= adj;
1297 else {
1298 if ((ret =
1299 __bam_get_prev(dbc)) != 0 &&
1300 ret != DB_NOTFOUND)
1301 return (ret);
1302 indx = cp->indx;
1303 pg = cp->page;
1304 }
1305 if (dbc->dbtype == DB_RECNO)
1306 cp->recno--;
1307 get_space:
1308 /*
1309 * See if we put anything in the
1310 * buffer or if we are doing a DBP->get
1311 * did we get all of the data.
1312 */
1313 if (offp >=
1314 (is_key ? &endp[-1] : endp) ||
1315 F_ISSET(dbc, DBC_TRANSIENT)) {
1316 data->size = (u_int32_t)
1317 DB_ALIGN(size +
1318 data->ulen - space, 1024);
1319 return (DB_BUFFER_SMALL);
1320 }
1321 break;
1322 }
1323 memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
1324 need_pg = 0;
1325 space -= size;
1326 np += size;
1327 }
1328 /*
1329 * Add the offsets and sizes to the end of the buffer.
1330 * First add the key info then the data info.
1331 */
1332 if (is_key) {
1333 *offp-- = (int32_t)key_off;
1334 *offp-- = (int32_t)key_size;
1335 } else if (rec_key)
1336 *offp-- = (int32_t)cp->recno;
1337 *offp-- = (int32_t)((inp[indx + adj - 1] - HOFFSET(pg))
1338 + (dp - dbuf) + SSZA(BKEYDATA, data));
1339 *offp-- = bk->len;
1340 }
1341 if (dbc->dbtype == DB_RECNO)
1342 cp->recno++;
1343 else if (no_dup) {
1344 while (indx + adj < NUM_ENT(pg) &&
1345 pg_keyoff == inp[indx + adj])
1346 indx += adj;
1347 }
1348 /*
1349 * Stop when we either run off the page or we move to the next key and
1350 * we are not returning multiple keys.
1351 */
1352 } while ((indx += adj) < NUM_ENT(pg) &&
1353 (next_key || pg_keyoff == inp[indx]));
1354
1355 /* If we are off the page then try to the next page. */
1356 if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
1357 cp->indx = indx;
1358 ret = __bam_c_next(dbc, 0, 1);
1359 if (ret == 0)
1360 goto next_pg;
1361 if (ret != DB_NOTFOUND)
1362 return (ret);
1363 }
1364
1365 /*
1366 * If we did a DBP->get we must error if we did not return
1367 * all the data for the current key because there is
1368 * no way to know if we did not get it all, nor any
1369 * interface to fetch the balance.
1370 */
1371
1372 if (ret == 0 && indx < pg->entries &&
1373 F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) {
1374 data->size = (data->ulen - space) + size;
1375 return (DB_BUFFER_SMALL);
1376 }
1377 /*
1378 * Must leave the index pointing at the last record fetched.
1379 * If we are not fetching keys, we may have stepped to the
1380 * next key.
1381 */
1382 if (ret == DB_BUFFER_SMALL || next_key || pg_keyoff == inp[indx])
1383 cp->indx = indx;
1384 else
1385 cp->indx = indx - P_INDX;
1386
1387 if (rec_key == 1)
1388 *offp = RECNO_OOB;
1389 else
1390 *offp = -1;
1391 return (0);
1392 }
1393
1394 /*
1395 * __bam_bulk_overflow --
1396 * Dump overflow record into the buffer.
1397 * The space requirements have already been checked.
1398 * PUBLIC: int __bam_bulk_overflow
1399 * PUBLIC: __P((DBC *, u_int32_t, db_pgno_t, u_int8_t *));
1400 */
1401 int
1402 __bam_bulk_overflow(dbc, len, pgno, dp)
1403 DBC *dbc;
1404 u_int32_t len;
1405 db_pgno_t pgno;
1406 u_int8_t *dp;
1407 {
1408 DBT dbt;
1409
1410 memset(&dbt, 0, sizeof(dbt));
1411 F_SET(&dbt, DB_DBT_USERMEM);
1412 dbt.ulen = len;
1413 dbt.data = (void *)dp;
1414 return (__db_goff(dbc->dbp, &dbt, len, pgno, NULL, NULL));
1415 }
1416
1417 /*
1418 * __bam_bulk_duplicates --
1419 * Put as many off page duplicates as will fit into the buffer.
1420 * This routine will adjust the cursor to reflect the position in
1421 * the overflow tree.
1422 * PUBLIC: int __bam_bulk_duplicates __P((DBC *,
1423 * PUBLIC: db_pgno_t, u_int8_t *, int32_t *,
1424 * PUBLIC: int32_t **, u_int8_t **, u_int32_t *, int));
1425 */
1426 int
1427 __bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup)
1428 DBC *dbc;
1429 db_pgno_t pgno;
1430 u_int8_t *dbuf;
1431 int32_t *keyoff, **offpp;
1432 u_int8_t **dpp;
1433 u_int32_t *spacep;
1434 int no_dup;
1435 {
1436 DB *dbp;
1437 BKEYDATA *bk;
1438 BOVERFLOW *bo;
1439 BTREE_CURSOR *cp;
1440 DBC *opd;
1441 DBT key, data;
1442 PAGE *pg;
1443 db_indx_t indx, *inp;
1444 int32_t *offp;
1445 u_int32_t pagesize, size, space;
1446 u_int8_t *dp, *np;
1447 int first, need_pg, ret, t_ret;
1448
1449 ret = 0;
1450
1451 dbp = dbc->dbp;
1452 cp = (BTREE_CURSOR *)dbc->internal;
1453 opd = cp->opd;
1454
1455 if (opd == NULL) {
1456 if ((ret = __db_c_newopd(dbc, pgno, NULL, &opd)) != 0)
1457 return (ret);
1458 cp->opd = opd;
1459 if ((ret = opd->c_am_get(opd,
1460 &key, &data, DB_FIRST, NULL)) != 0)
1461 goto close_opd;
1462 }
1463
1464 pagesize = opd->dbp->pgsize;
1465 cp = (BTREE_CURSOR *)opd->internal;
1466 space = *spacep;
1467 /* Get current offset slot. */
1468 offp = *offpp;
1469
1470 /*
1471 * np is the next place to put data.
1472 * dp is the beginning of the current page in the buffer.
1473 */
1474 np = dp = *dpp;
1475 first = 1;
1476 indx = cp->indx;
1477
1478 do {
1479 /* Fetch the current record. No initial move. */
1480 if ((ret = __bam_c_next(opd, 0, 0)) != 0)
1481 break;
1482 pg = cp->page;
1483 indx = cp->indx;
1484 inp = P_INP(dbp, pg);
1485 /* We need to copy the page to the buffer. */
1486 need_pg = 1;
1487
1488 do {
1489 if (IS_DELETED(dbp, pg, indx))
1490 goto contin;
1491 bk = GET_BKEYDATA(dbp, pg, indx);
1492 space -= 2 * sizeof(*offp);
1493 /* Allocate space for key if needed. */
1494 if (first == 0 && keyoff != NULL)
1495 space -= 2 * sizeof(*offp);
1496
1497 /* Did space underflow? */
1498 if (space > *spacep) {
1499 ret = DB_BUFFER_SMALL;
1500 if (first == 1) {
1501 /* Get the absolute value. */
1502 space = -(int32_t)space;
1503 space = *spacep + space;
1504 if (need_pg)
1505 space += pagesize - HOFFSET(pg);
1506 }
1507 break;
1508 }
1509 if (B_TYPE(bk->type) == B_OVERFLOW) {
1510 bo = (BOVERFLOW *)bk;
1511 size = bo->tlen;
1512 if (size > space) {
1513 ret = DB_BUFFER_SMALL;
1514 space = *spacep + size;
1515 break;
1516 }
1517 if (first == 0 && keyoff != NULL) {
1518 *offp-- = keyoff[0];
1519 *offp-- = keyoff[-1];
1520 }
1521 if ((ret = __bam_bulk_overflow(dbc,
1522 bo->tlen, bo->pgno, np)) != 0)
1523 return (ret);
1524 space -= size;
1525 *offp-- = (int32_t)(np - dbuf);
1526 np += size;
1527 } else {
1528 if (need_pg) {
1529 dp = np;
1530 size = pagesize - HOFFSET(pg);
1531 if (space < size) {
1532 ret = DB_BUFFER_SMALL;
1533 /* Return space required. */
1534 space = *spacep + size;
1535 break;
1536 }
1537 memcpy(dp,
1538 (u_int8_t *)pg + HOFFSET(pg), size);
1539 need_pg = 0;
1540 space -= size;
1541 np += size;
1542 }
1543 if (first == 0 && keyoff != NULL) {
1544 *offp-- = keyoff[0];
1545 *offp-- = keyoff[-1];
1546 }
1547 size = bk->len;
1548 *offp-- = (int32_t)((inp[indx] - HOFFSET(pg))
1549 + (dp - dbuf) + SSZA(BKEYDATA, data));
1550 }
1551 *offp-- = (int32_t)size;
1552 first = 0;
1553 if (no_dup)
1554 break;
1555 contin:
1556 indx++;
1557 if (opd->dbtype == DB_RECNO)
1558 cp->recno++;
1559 } while (indx < NUM_ENT(pg));
1560 if (no_dup)
1561 break;
1562 cp->indx = indx;
1563
1564 } while (ret == 0);
1565
1566 /* Return the updated information. */
1567 *spacep = space;
1568 *offpp = offp;
1569 *dpp = np;
1570
1571 /*
1572 * If we ran out of space back up the pointer.
1573 * If we did not return any dups or reached the end, close the opd.
1574 */
1575 if (ret == DB_BUFFER_SMALL) {
1576 if (opd->dbtype == DB_RECNO) {
1577 if (--cp->recno == 0)
1578 goto close_opd;
1579 } else if (indx != 0)
1580 cp->indx--;
1581 else {
1582 t_ret = __bam_c_prev(opd);
1583 if (t_ret == DB_NOTFOUND)
1584 goto close_opd;
1585 if (t_ret != 0)
1586 ret = t_ret;
1587 }
1588 } else if (keyoff == NULL && ret == DB_NOTFOUND) {
1589 cp->indx--;
1590 if (opd->dbtype == DB_RECNO)
1591 --cp->recno;
1592 } else if (indx == 0 || ret == DB_NOTFOUND) {
1593 close_opd:
1594 if (ret == DB_NOTFOUND)
1595 ret = 0;
1596 if ((t_ret = __db_c_close(opd)) != 0 && ret == 0)
1597 ret = t_ret;
1598 ((BTREE_CURSOR *)dbc->internal)->opd = NULL;
1599 }
1600 if (ret == DB_NOTFOUND)
1601 ret = 0;
1602
1603 return (ret);
1604 }
1605
1606 /*
1607 * __bam_getbothc --
1608 * Search for a matching data item on a join.
1609 */
1610 static int
1611 __bam_getbothc(dbc, data)
1612 DBC *dbc;
1613 DBT *data;
1614 {
1615 BTREE_CURSOR *cp;
1616 DB *dbp;
1617 DB_MPOOLFILE *mpf;
1618 int cmp, exact, ret;
1619
1620 dbp = dbc->dbp;
1621 mpf = dbp->mpf;
1622 cp = (BTREE_CURSOR *)dbc->internal;
1623
1624 /*
1625 * Acquire the current page. We have at least a read-lock
1626 * already. The caller may have set DB_RMW asking for a
1627 * write lock, but upgrading to a write lock has no better
1628 * chance of succeeding now instead of later, so don't try.
1629 */
1630 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
1631 return (ret);
1632
1633 /*
1634 * An off-page duplicate cursor. Search the remaining duplicates
1635 * for one which matches (do a normal btree search, then verify
1636 * that the retrieved record is greater than the original one).
1637 */
1638 if (F_ISSET(dbc, DBC_OPD)) {
1639 /*
1640 * Check to make sure the desired item comes strictly after
1641 * the current position; if it doesn't, return DB_NOTFOUND.
1642 */
1643 if ((ret = __bam_cmp(dbp, data, cp->page, cp->indx,
1644 dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare,
1645 &cmp)) != 0)
1646 return (ret);
1647
1648 if (cmp <= 0)
1649 return (DB_NOTFOUND);
1650
1651 /* Discard the current page, we're going to do a full search. */
1652 if ((ret = __memp_fput(mpf, cp->page, 0)) != 0)
1653 return (ret);
1654 cp->page = NULL;
1655
1656 return (__bam_c_search(dbc,
1657 PGNO_INVALID, data, DB_GET_BOTH, &exact));
1658 }
1659
1660 /*
1661 * We're doing a DBC->c_get(DB_GET_BOTHC) and we're already searching
1662 * a set of on-page duplicates (either sorted or unsorted). Continue
1663 * a linear search from after the current position.
1664 *
1665 * (Note that we could have just finished a "set" of one duplicate,
1666 * i.e. not a duplicate at all, but the following check will always
1667 * return DB_NOTFOUND in this case, which is the desired behavior.)
1668 */
1669 if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1670 !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1671 return (DB_NOTFOUND);
1672 cp->indx += P_INDX;
1673
1674 return (__bam_getboth_finddatum(dbc, data, DB_GET_BOTH));
1675 }
1676
1677 /*
1678 * __bam_getboth_finddatum --
1679 * Find a matching on-page data item.
1680 */
1681 static int
1682 __bam_getboth_finddatum(dbc, data, flags)
1683 DBC *dbc;
1684 DBT *data;
1685 u_int32_t flags;
1686 {
1687 BTREE_CURSOR *cp;
1688 DB *dbp;
1689 db_indx_t base, lim, top;
1690 int cmp, ret;
1691
1692 COMPQUIET(cmp, 0);
1693
1694 dbp = dbc->dbp;
1695 cp = (BTREE_CURSOR *)dbc->internal;
1696
1697 /*
1698 * Called (sometimes indirectly) from DBC->get to search on-page data
1699 * item(s) for a matching value. If the original flag was DB_GET_BOTH
1700 * or DB_GET_BOTH_RANGE, the cursor is set to the first undeleted data
1701 * item for the key. If the original flag was DB_GET_BOTHC, the cursor
1702 * argument is set to the first data item we can potentially return.
1703 * In both cases, there may or may not be additional duplicate data
1704 * items to search.
1705 *
1706 * If the duplicates are not sorted, do a linear search.
1707 */
1708 if (dbp->dup_compare == NULL) {
1709 for (;; cp->indx += P_INDX) {
1710 if (!IS_CUR_DELETED(dbc) &&
1711 (ret = __bam_cmp(dbp, data, cp->page,
1712 cp->indx + O_INDX, __bam_defcmp, &cmp)) != 0)
1713 return (ret);
1714 if (cmp == 0)
1715 return (0);
1716
1717 if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1718 !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1719 break;
1720 }
1721 return (DB_NOTFOUND);
1722 }
1723
1724 /*
1725 * If the duplicates are sorted, do a binary search. The reason for
1726 * this is that large pages and small key/data pairs result in large
1727 * numbers of on-page duplicates before they get pushed off-page.
1728 *
1729 * Find the top and bottom of the duplicate set. Binary search
1730 * requires at least two items, don't loop if there's only one.
1731 */
1732 for (base = top = cp->indx; top < NUM_ENT(cp->page); top += P_INDX)
1733 if (!IS_DUPLICATE(dbc, cp->indx, top))
1734 break;
1735 if (base == (top - P_INDX)) {
1736 if ((ret = __bam_cmp(dbp, data,
1737 cp->page, cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1738 return (ret);
1739 return (cmp == 0 ||
1740 (cmp < 0 && flags == DB_GET_BOTH_RANGE) ? 0 : DB_NOTFOUND);
1741 }
1742
1743 for (lim = (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
1744 cp->indx = base + ((lim >> 1) * P_INDX);
1745 if ((ret = __bam_cmp(dbp, data, cp->page,
1746 cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1747 return (ret);
1748 if (cmp == 0) {
1749 /*
1750 * XXX
1751 * No duplicate duplicates in sorted duplicate sets,
1752 * so there can be only one.
1753 */
1754 if (!IS_CUR_DELETED(dbc))
1755 return (0);
1756 break;
1757 }
1758 if (cmp > 0) {
1759 base = cp->indx + P_INDX;
1760 --lim;
1761 }
1762 }
1763
1764 /* No match found; if we're looking for an exact match, we're done. */
1765 if (flags == DB_GET_BOTH)
1766 return (DB_NOTFOUND);
1767
1768 /*
1769 * Base is the smallest index greater than the data item, may be zero
1770 * or a last + O_INDX index, and may be deleted. Find an undeleted
1771 * item.
1772 */
1773 cp->indx = base;
1774 while (cp->indx < top && IS_CUR_DELETED(dbc))
1775 cp->indx += P_INDX;
1776 return (cp->indx < top ? 0 : DB_NOTFOUND);
1777 }
1778
1779 /*
1780 * __bam_c_put --
1781 * Put using a cursor.
1782 */
1783 static int
1784 __bam_c_put(dbc, key, data, flags, pgnop)
1785 DBC *dbc;
1786 DBT *key, *data;
1787 u_int32_t flags;
1788 db_pgno_t *pgnop;
1789 {
1790 BTREE *t;
1791 BTREE_CURSOR *cp;
1792 DB *dbp;
1793 DBT dbt;
1794 DB_MPOOLFILE *mpf;
1795 db_pgno_t root_pgno;
1796 u_int32_t iiop;
1797 int cmp, exact, own, ret, stack;
1798 void *arg;
1799
1800 dbp = dbc->dbp;
1801 mpf = dbp->mpf;
1802 cp = (BTREE_CURSOR *)dbc->internal;
1803 root_pgno = cp->root;
1804
1805 split: ret = stack = 0;
1806 switch (flags) {
1807 case DB_CURRENT:
1808 if (F_ISSET(cp, C_DELETED))
1809 return (DB_NOTFOUND);
1810 /* FALLTHROUGH */
1811
1812 case DB_AFTER:
1813 case DB_BEFORE:
1814 iiop = flags;
1815 own = 1;
1816
1817 /* Acquire the current page with a write lock. */
1818 ACQUIRE_WRITE_LOCK(dbc, ret);
1819 if (ret != 0)
1820 goto err;
1821 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
1822 goto err;
1823 break;
1824 case DB_KEYFIRST:
1825 case DB_KEYLAST:
1826 case DB_NODUPDATA:
1827 own = 0;
1828 /*
1829 * Searching off-page, sorted duplicate tree: do a tree search
1830 * for the correct item; __bam_c_search returns the smallest
1831 * slot greater than the key, use it.
1832 *
1833 * See comment below regarding where we can start the search.
1834 */
1835 if (F_ISSET(dbc, DBC_OPD)) {
1836 if ((ret = __bam_c_search(dbc,
1837 F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno,
1838 data, flags, &exact)) != 0)
1839 goto err;
1840 stack = 1;
1841
1842 /* Disallow "sorted" duplicate duplicates. */
1843 if (exact) {
1844 if (IS_DELETED(dbp, cp->page, cp->indx)) {
1845 iiop = DB_CURRENT;
1846 break;
1847 }
1848 ret = __db_duperr(dbp, flags);
1849 goto err;
1850 }
1851 iiop = DB_BEFORE;
1852 break;
1853 }
1854
1855 /*
1856 * Searching a btree.
1857 *
1858 * If we've done a split, we can start the search from the
1859 * parent of the split page, which __bam_split returned
1860 * for us in root_pgno, unless we're in a Btree with record
1861 * numbering. In that case, we'll need the true root page
1862 * in order to adjust the record count.
1863 */
1864 if ((ret = __bam_c_search(dbc,
1865 F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, key,
1866 flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
1867 DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
1868 goto err;
1869 stack = 1;
1870
1871 /*
1872 * If we don't have an exact match, __bam_c_search returned
1873 * the smallest slot greater than the key, use it.
1874 */
1875 if (!exact) {
1876 iiop = DB_KEYFIRST;
1877 break;
1878 }
1879
1880 /*
1881 * If duplicates aren't supported, replace the current item.
1882 * (If implementing the DB->put function, our caller already
1883 * checked the DB_NOOVERWRITE flag.)
1884 */
1885 if (!F_ISSET(dbp, DB_AM_DUP)) {
1886 iiop = DB_CURRENT;
1887 break;
1888 }
1889
1890 /*
1891 * If we find a matching entry, it may be an off-page duplicate
1892 * tree. Return the page number to our caller, we need a new
1893 * cursor.
1894 */
1895 if (pgnop != NULL && __bam_isopd(dbc, pgnop))
1896 goto done;
1897
1898 /* If the duplicates aren't sorted, move to the right slot. */
1899 if (dbp->dup_compare == NULL) {
1900 if (flags == DB_KEYFIRST)
1901 iiop = DB_BEFORE;
1902 else
1903 for (;; cp->indx += P_INDX)
1904 if (cp->indx + P_INDX >=
1905 NUM_ENT(cp->page) ||
1906 !IS_DUPLICATE(dbc, cp->indx,
1907 cp->indx + P_INDX)) {
1908 iiop = DB_AFTER;
1909 break;
1910 }
1911 break;
1912 }
1913
1914 /*
1915 * We know that we're looking at the first of a set of sorted
1916 * on-page duplicates. Walk the list to find the right slot.
1917 */
1918 for (;; cp->indx += P_INDX) {
1919 if ((ret = __bam_cmp(dbp, data, cp->page,
1920 cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1921 goto err;
1922 if (cmp < 0) {
1923 iiop = DB_BEFORE;
1924 break;
1925 }
1926
1927 /* Disallow "sorted" duplicate duplicates. */
1928 if (cmp == 0) {
1929 if (IS_DELETED(dbp, cp->page, cp->indx)) {
1930 iiop = DB_CURRENT;
1931 break;
1932 }
1933 ret = __db_duperr(dbp, flags);
1934 goto err;
1935 }
1936
1937 if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1938 P_INP(dbp, ((PAGE *)cp->page))[cp->indx] !=
1939 P_INP(dbp, ((PAGE *)cp->page))[cp->indx + P_INDX]) {
1940 iiop = DB_AFTER;
1941 break;
1942 }
1943 }
1944 break;
1945 default:
1946 ret = __db_unknown_flag(dbp->dbenv, "__bam_c_put", flags);
1947 goto err;
1948 }
1949
1950 switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) {
1951 case 0:
1952 break;
1953 case DB_NEEDSPLIT:
1954 /*
1955 * To split, we need a key for the page. Either use the key
1956 * argument or get a copy of the key from the page.
1957 */
1958 if (flags == DB_AFTER ||
1959 flags == DB_BEFORE || flags == DB_CURRENT) {
1960 memset(&dbt, 0, sizeof(DBT));
1961 if ((ret = __db_ret(dbp, cp->page, 0, &dbt,
1962 &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
1963 goto err;
1964 arg = &dbt;
1965 } else
1966 arg = F_ISSET(dbc, DBC_OPD) ? data : key;
1967
1968 /*
1969 * Discard any locks and pinned pages (the locks are discarded
1970 * even if we're running with transactions, as they lock pages
1971 * that we're sorry we ever acquired). If stack is set and the
1972 * cursor entries are valid, they point to the same entries as
1973 * the stack, don't free them twice.
1974 */
1975 if (stack)
1976 ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
1977 else
1978 DISCARD_CUR(dbc, ret);
1979 if (ret != 0)
1980 goto err;
1981
1982 /*
1983 * SR [#6059]
1984 * If we do not own a lock on the page any more, then clear the
1985 * cursor so we don't point at it. Even if we call __bam_stkrel
1986 * above we still may have entered the routine with the cursor
1987 * positioned to a particular record. This is in the case
1988 * where C_RECNUM is set.
1989 */
1990 if (own == 0) {
1991 cp->pgno = PGNO_INVALID;
1992 cp->indx = 0;
1993 }
1994
1995 /* Split the tree. */
1996 if ((ret = __bam_split(dbc, arg, &root_pgno)) != 0)
1997 return (ret);
1998
1999 goto split;
2000 default:
2001 goto err;
2002 }
2003
2004 err:
2005 done: /*
2006 * If we inserted a key into the first or last slot of the tree,
2007 * remember where it was so we can do it more quickly next time.
2008 * If the tree has record numbers, we need a complete stack so
2009 * that we can adjust the record counts, so skipping the tree search
2010 * isn't possible. For subdatabases we need to be careful that the
2011 * page does not move from one db to another, so we track its LSN.
2012 *
2013 * If there are duplicates and we are inserting into the last slot,
2014 * the cursor will point _to_ the last item, not after it, which
2015 * is why we subtract P_INDX below.
2016 */
2017
2018 t = dbp->bt_internal;
2019 if (ret == 0 && TYPE(cp->page) == P_LBTREE &&
2020 (flags == DB_KEYFIRST || flags == DB_KEYLAST) &&
2021 !F_ISSET(cp, C_RECNUM) &&
2022 (!F_ISSET(dbp, DB_AM_SUBDB) ||
2023 (LOGGING_ON(dbp->dbenv) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))) &&
2024 ((NEXT_PGNO(cp->page) == PGNO_INVALID &&
2025 cp->indx >= NUM_ENT(cp->page) - P_INDX) ||
2026 (PREV_PGNO(cp->page) == PGNO_INVALID && cp->indx == 0))) {
2027 t->bt_lpgno = cp->pgno;
2028 if (F_ISSET(dbp, DB_AM_SUBDB))
2029 t->bt_llsn = LSN(cp->page);
2030 } else
2031 t->bt_lpgno = PGNO_INVALID;
2032 /*
2033 * Discard any pages pinned in the tree and their locks, except for
2034 * the leaf page. Note, the leaf page participated in any stack we
2035 * acquired, and so we have to adjust the stack as necessary. If
2036 * there was only a single page on the stack, we don't have to free
2037 * further stack pages.
2038 */
2039 if (stack && BT_STK_POP(cp) != NULL)
2040 (void)__bam_stkrel(dbc, 0);
2041
2042 /*
2043 * Regardless of whether we were successful or not, clear the delete
2044 * flag. If we're successful, we either moved the cursor or the item
2045 * is no longer deleted. If we're not successful, then we're just a
2046 * copy, no need to have the flag set.
2047 *
2048 * We may have instantiated off-page duplicate cursors during the put,
2049 * so clear the deleted bit from the off-page duplicate cursor as well.
2050 */
2051 F_CLR(cp, C_DELETED);
2052 if (cp->opd != NULL) {
2053 cp = (BTREE_CURSOR *)cp->opd->internal;
2054 F_CLR(cp, C_DELETED);
2055 }
2056
2057 return (ret);
2058 }
2059
2060 /*
2061 * __bam_c_rget --
2062 * Return the record number for a cursor.
2063 *
2064 * PUBLIC: int __bam_c_rget __P((DBC *, DBT *));
2065 */
2066 int
2067 __bam_c_rget(dbc, data)
2068 DBC *dbc;
2069 DBT *data;
2070 {
2071 BTREE_CURSOR *cp;
2072 DB *dbp;
2073 DBT dbt;
2074 DB_MPOOLFILE *mpf;
2075 db_recno_t recno;
2076 int exact, ret, t_ret;
2077
2078 dbp = dbc->dbp;
2079 mpf = dbp->mpf;
2080 cp = (BTREE_CURSOR *)dbc->internal;
2081
2082 /*
2083 * Get the page with the current item on it.
2084 * Get a copy of the key.
2085 * Release the page, making sure we don't release it twice.
2086 */
2087 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &cp->page)) != 0)
2088 return (ret);
2089 memset(&dbt, 0, sizeof(DBT));
2090 if ((ret = __db_ret(dbp, cp->page,
2091 cp->indx, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2092 goto err;
2093 ret = __memp_fput(mpf, cp->page, 0);
2094 cp->page = NULL;
2095 if (ret != 0)
2096 return (ret);
2097
2098 if ((ret = __bam_search(dbc, PGNO_INVALID, &dbt,
2099 F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND,
2100 1, &recno, &exact)) != 0)
2101 goto err;
2102
2103 ret = __db_retcopy(dbp->dbenv, data,
2104 &recno, sizeof(recno), &dbc->rdata->data, &dbc->rdata->ulen);
2105
2106 /* Release the stack. */
2107 err: if ((t_ret = __bam_stkrel(dbc, 0)) != 0 && ret == 0)
2108 ret = t_ret;
2109
2110 return (ret);
2111 }
2112
2113 /*
2114 * __bam_c_writelock --
2115 * Upgrade the cursor to a write lock.
2116 */
2117 static int
2118 __bam_c_writelock(dbc)
2119 DBC *dbc;
2120 {
2121 BTREE_CURSOR *cp;
2122 int ret;
2123
2124 cp = (BTREE_CURSOR *)dbc->internal;
2125
2126 if (cp->lock_mode == DB_LOCK_WRITE)
2127 return (0);
2128
2129 /*
2130 * When writing to an off-page duplicate tree, we need to have the
2131 * appropriate page in the primary tree locked. The general DBC
2132 * code calls us first with the primary cursor so we can acquire the
2133 * appropriate lock.
2134 */
2135 ACQUIRE_WRITE_LOCK(dbc, ret);
2136 return (ret);
2137 }
2138
2139 /*
2140 * __bam_c_first --
2141 * Return the first record.
2142 */
2143 static int
2144 __bam_c_first(dbc)
2145 DBC *dbc;
2146 {
2147 BTREE_CURSOR *cp;
2148 db_pgno_t pgno;
2149 int ret;
2150
2151 cp = (BTREE_CURSOR *)dbc->internal;
2152 ret = 0;
2153
2154 /* Walk down the left-hand side of the tree. */
2155 for (pgno = cp->root;;) {
2156 ACQUIRE_CUR_COUPLE(dbc, DB_LOCK_READ, pgno, ret);
2157 if (ret != 0)
2158 return (ret);
2159
2160 /* If we find a leaf page, we're done. */
2161 if (ISLEAF(cp->page))
2162 break;
2163
2164 pgno = GET_BINTERNAL(dbc->dbp, cp->page, 0)->pgno;
2165 }
2166
2167 /* If we want a write lock instead of a read lock, get it now. */
2168 if (F_ISSET(dbc, DBC_RMW)) {
2169 ACQUIRE_WRITE_LOCK(dbc, ret);
2170 if (ret != 0)
2171 return (ret);
2172 }
2173
2174 cp->indx = 0;
2175
2176 /* If on an empty page or a deleted record, move to the next one. */
2177 if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
2178 if ((ret = __bam_c_next(dbc, 0, 0)) != 0)
2179 return (ret);
2180
2181 return (0);
2182 }
2183
2184 /*
2185 * __bam_c_last --
2186 * Return the last record.
2187 */
2188 static int
2189 __bam_c_last(dbc)
2190 DBC *dbc;
2191 {
2192 BTREE_CURSOR *cp;
2193 db_pgno_t pgno;
2194 int ret;
2195
2196 cp = (BTREE_CURSOR *)dbc->internal;
2197 ret = 0;
2198
2199 /* Walk down the right-hand side of the tree. */
2200 for (pgno = cp->root;;) {
2201 ACQUIRE_CUR_COUPLE(dbc, DB_LOCK_READ, pgno, ret);
2202 if (ret != 0)
2203 return (ret);
2204
2205 /* If we find a leaf page, we're done. */
2206 if (ISLEAF(cp->page))
2207 break;
2208
2209 pgno = GET_BINTERNAL(dbc->dbp, cp->page,
2210 NUM_ENT(cp->page) - O_INDX)->pgno;
2211 }
2212
2213 /* If we want a write lock instead of a read lock, get it now. */
2214 if (F_ISSET(dbc, DBC_RMW)) {
2215 ACQUIRE_WRITE_LOCK(dbc, ret);
2216 if (ret != 0)
2217 return (ret);
2218 }
2219
2220 cp->indx = NUM_ENT(cp->page) == 0 ? 0 :
2221 NUM_ENT(cp->page) -
2222 (TYPE(cp->page) == P_LBTREE ? P_INDX : O_INDX);
2223
2224 /* If on an empty page or a deleted record, move to the previous one. */
2225 if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
2226 if ((ret = __bam_c_prev(dbc)) != 0)
2227 return (ret);
2228
2229 return (0);
2230 }
2231
2232 /*
2233 * __bam_c_next --
2234 * Move to the next record.
2235 */
2236 static int
2237 __bam_c_next(dbc, initial_move, deleted_okay)
2238 DBC *dbc;
2239 int initial_move, deleted_okay;
2240 {
2241 BTREE_CURSOR *cp;
2242 db_indx_t adjust;
2243 db_lockmode_t lock_mode;
2244 db_pgno_t pgno;
2245 int ret;
2246
2247 cp = (BTREE_CURSOR *)dbc->internal;
2248 ret = 0;
2249
2250 /*
2251 * We're either moving through a page of duplicates or a btree leaf
2252 * page.
2253 *
2254 * !!!
2255 * This code handles empty pages and pages with only deleted entries.
2256 */
2257 if (F_ISSET(dbc, DBC_OPD)) {
2258 adjust = O_INDX;
2259 lock_mode = DB_LOCK_NG;
2260 } else {
2261 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2262 lock_mode =
2263 F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2264 }
2265 if (cp->page == NULL) {
2266 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, ret);
2267 if (ret != 0)
2268 return (ret);
2269 }
2270
2271 if (initial_move)
2272 cp->indx += adjust;
2273
2274 for (;;) {
2275 /*
2276 * If at the end of the page, move to a subsequent page.
2277 *
2278 * !!!
2279 * Check for >= NUM_ENT. If the original search landed us on
2280 * NUM_ENT, we may have incremented indx before the test.
2281 */
2282 if (cp->indx >= NUM_ENT(cp->page)) {
2283 if ((pgno
2284 = NEXT_PGNO(cp->page)) == PGNO_INVALID)
2285 return (DB_NOTFOUND);
2286
2287 ACQUIRE_CUR(dbc, lock_mode, pgno, ret);
2288 if (ret != 0)
2289 return (ret);
2290 cp->indx = 0;
2291 continue;
2292 }
2293 if (!deleted_okay && IS_CUR_DELETED(dbc)) {
2294 cp->indx += adjust;
2295 continue;
2296 }
2297 break;
2298 }
2299 return (0);
2300 }
2301
2302 /*
2303 * __bam_c_prev --
2304 * Move to the previous record.
2305 */
2306 static int
2307 __bam_c_prev(dbc)
2308 DBC *dbc;
2309 {
2310 BTREE_CURSOR *cp;
2311 db_indx_t adjust;
2312 db_lockmode_t lock_mode;
2313 db_pgno_t pgno;
2314 int ret;
2315
2316 cp = (BTREE_CURSOR *)dbc->internal;
2317 ret = 0;
2318
2319 /*
2320 * We're either moving through a page of duplicates or a btree leaf
2321 * page.
2322 *
2323 * !!!
2324 * This code handles empty pages and pages with only deleted entries.
2325 */
2326 if (F_ISSET(dbc, DBC_OPD)) {
2327 adjust = O_INDX;
2328 lock_mode = DB_LOCK_NG;
2329 } else {
2330 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2331 lock_mode =
2332 F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2333 }
2334 if (cp->page == NULL) {
2335 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, ret);
2336 if (ret != 0)
2337 return (ret);
2338 }
2339
2340 for (;;) {
2341 /* If at the beginning of the page, move to a previous one. */
2342 if (cp->indx == 0) {
2343 if ((pgno =
2344 PREV_PGNO(cp->page)) == PGNO_INVALID)
2345 return (DB_NOTFOUND);
2346
2347 ACQUIRE_CUR(dbc, lock_mode, pgno, ret);
2348 if (ret != 0)
2349 return (ret);
2350
2351 if ((cp->indx = NUM_ENT(cp->page)) == 0)
2352 continue;
2353 }
2354
2355 /* Ignore deleted records. */
2356 cp->indx -= adjust;
2357 if (IS_CUR_DELETED(dbc))
2358 continue;
2359
2360 break;
2361 }
2362 return (0);
2363 }
2364
2365 /*
2366 * __bam_c_search --
2367 * Move to a specified record.
2368 */
2369 static int
2370 __bam_c_search(dbc, root_pgno, key, flags, exactp)
2371 DBC *dbc;
2372 db_pgno_t root_pgno;
2373 const DBT *key;
2374 u_int32_t flags;
2375 int *exactp;
2376 {
2377 BTREE *t;
2378 BTREE_CURSOR *cp;
2379 DB *dbp;
2380 PAGE *h;
2381 db_indx_t indx, *inp;
2382 db_pgno_t bt_lpgno;
2383 db_recno_t recno;
2384 u_int32_t sflags;
2385 int cmp, ret, t_ret;
2386
2387 dbp = dbc->dbp;
2388 cp = (BTREE_CURSOR *)dbc->internal;
2389 t = dbp->bt_internal;
2390 ret = 0;
2391
2392 /*
2393 * Find an entry in the database. Discard any lock we currently hold,
2394 * we're going to search the tree.
2395 */
2396 DISCARD_CUR(dbc, ret);
2397 if (ret != 0)
2398 return (ret);
2399
2400 switch (flags) {
2401 case DB_SET_RECNO:
2402 if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
2403 return (ret);
2404 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
2405 if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
2406 return (ret);
2407 break;
2408 case DB_SET:
2409 case DB_GET_BOTH:
2410 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
2411 goto search;
2412 case DB_GET_BOTH_RANGE:
2413 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND);
2414 goto search;
2415 case DB_SET_RANGE:
2416 sflags =
2417 (F_ISSET(dbc, DBC_RMW) ? S_WRITE : S_READ) | S_DUPFIRST;
2418 goto search;
2419 case DB_KEYFIRST:
2420 sflags = S_KEYFIRST;
2421 goto fast_search;
2422 case DB_KEYLAST:
2423 case DB_NODUPDATA:
2424 sflags = S_KEYLAST;
2425 fast_search: /*
2426 * If the application has a history of inserting into the first
2427 * or last pages of the database, we check those pages first to
2428 * avoid doing a full search.
2429 */
2430 if (F_ISSET(dbc, DBC_OPD))
2431 goto search;
2432
2433 /*
2434 * !!!
2435 * We do not mutex protect the t->bt_lpgno field, which means
2436 * that it can only be used in an advisory manner. If we find
2437 * page we can use, great. If we don't, we don't care, we do
2438 * it the slow way instead. Regardless, copy it into a local
2439 * variable, otherwise we might acquire a lock for a page and
2440 * then read a different page because it changed underfoot.
2441 */
2442 bt_lpgno = t->bt_lpgno;
2443
2444 /*
2445 * If the tree has no history of insertion, do it the slow way.
2446 */
2447 if (bt_lpgno == PGNO_INVALID)
2448 goto search;
2449
2450 /*
2451 * Lock and retrieve the page on which we last inserted.
2452 *
2453 * The page may not exist: if a transaction created the page
2454 * and then aborted, the page might have been truncated from
2455 * the end of the file.
2456 */
2457 h = NULL;
2458 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, bt_lpgno, ret);
2459 if (ret != 0) {
2460 if (ret == DB_PAGE_NOTFOUND)
2461 ret = 0;
2462 goto fast_miss;
2463 }
2464
2465 h = cp->page;
2466 inp = P_INP(dbp, h);
2467
2468 /*
2469 * It's okay if the page type isn't right or it's empty, it
2470 * just means that the world changed.
2471 */
2472 if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
2473 goto fast_miss;
2474
2475 /* Verify that this page cannot have moved to another db. */
2476 if (F_ISSET(dbp, DB_AM_SUBDB) &&
2477 log_compare(&t->bt_llsn, &LSN(h)) != 0)
2478 goto fast_miss;
2479 /*
2480 * What we do here is test to see if we're at the beginning or
2481 * end of the tree and if the new item sorts before/after the
2482 * first/last page entry. We don't try and catch inserts into
2483 * the middle of the tree (although we could, as long as there
2484 * were two keys on the page and we saved both the index and
2485 * the page number of the last insert).
2486 */
2487 if (h->next_pgno == PGNO_INVALID) {
2488 indx = NUM_ENT(h) - P_INDX;
2489 if ((ret = __bam_cmp(dbp,
2490 key, h, indx, t->bt_compare, &cmp)) != 0)
2491 return (ret);
2492
2493 if (cmp < 0)
2494 goto try_begin;
2495 if (cmp > 0) {
2496 indx += P_INDX;
2497 goto fast_hit;
2498 }
2499
2500 /*
2501 * Found a duplicate. If doing DB_KEYLAST, we're at
2502 * the correct position, otherwise, move to the first
2503 * of the duplicates. If we're looking at off-page
2504 * duplicates, duplicate duplicates aren't permitted,
2505 * so we're done.
2506 */
2507 if (flags == DB_KEYLAST)
2508 goto fast_hit;
2509 for (;
2510 indx > 0 && inp[indx - P_INDX] == inp[indx];
2511 indx -= P_INDX)
2512 ;
2513 goto fast_hit;
2514 }
2515 try_begin: if (h->prev_pgno == PGNO_INVALID) {
2516 indx = 0;
2517 if ((ret = __bam_cmp(dbp,
2518 key, h, indx, t->bt_compare, &cmp)) != 0)
2519 return (ret);
2520
2521 if (cmp > 0)
2522 goto fast_miss;
2523 if (cmp < 0)
2524 goto fast_hit;
2525
2526 /*
2527 * Found a duplicate. If doing DB_KEYFIRST, we're at
2528 * the correct position, otherwise, move to the last
2529 * of the duplicates. If we're looking at off-page
2530 * duplicates, duplicate duplicates aren't permitted,
2531 * so we're done.
2532 */
2533 if (flags == DB_KEYFIRST)
2534 goto fast_hit;
2535 for (;
2536 indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
2537 inp[indx] == inp[indx + P_INDX];
2538 indx += P_INDX)
2539 ;
2540 goto fast_hit;
2541 }
2542 goto fast_miss;
2543
2544 fast_hit: /* Set the exact match flag, we may have found a duplicate. */
2545 *exactp = cmp == 0;
2546
2547 /*
2548 * Insert the entry in the stack. (Our caller is likely to
2549 * call __bam_stkrel() after our return.)
2550 */
2551 BT_STK_CLR(cp);
2552 BT_STK_ENTER(dbp->dbenv,
2553 cp, h, indx, cp->lock, cp->lock_mode, ret);
2554 if (ret != 0)
2555 return (ret);
2556 break;
2557
2558 fast_miss: /*
2559 * This was not the right page, so we do not need to retain
2560 * the lock even in the presence of transactions.
2561 *
2562 * This is also an error path, so ret may have been set.
2563 */
2564 DISCARD_CUR(dbc, ret);
2565 cp->pgno = PGNO_INVALID;
2566 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
2567 ret = t_ret;
2568 if (ret != 0)
2569 return (ret);
2570
2571 search: if ((ret = __bam_search(dbc, root_pgno,
2572 key, sflags, 1, NULL, exactp)) != 0)
2573 return (ret);
2574 break;
2575 default:
2576 return (__db_unknown_flag(dbp->dbenv, "__bam_c_search", flags));
2577 }
2578
2579 /* Initialize the cursor from the stack. */
2580 cp->page = cp->csp->page;
2581 cp->pgno = cp->csp->page->pgno;
2582 cp->indx = cp->csp->indx;
2583 cp->lock = cp->csp->lock;
2584 cp->lock_mode = cp->csp->lock_mode;
2585
2586 return (0);
2587 }
2588
2589 /*
2590 * __bam_c_physdel --
2591 * Physically remove an item from the page.
2592 */
2593 static int
2594 __bam_c_physdel(dbc)
2595 DBC *dbc;
2596 {
2597 BTREE_CURSOR *cp;
2598 DB *dbp;
2599 DBT key;
2600 DB_LOCK lock;
2601 DB_MPOOLFILE *mpf;
2602 PAGE *h;
2603 db_pgno_t pgno;
2604 int delete_page, empty_page, exact, level, ret;
2605
2606 dbp = dbc->dbp;
2607 memset(&key, 0, sizeof(DBT));
2608 mpf = dbp->mpf;
2609 cp = (BTREE_CURSOR *)dbc->internal;
2610 delete_page = empty_page = ret = 0;
2611
2612 /* If the page is going to be emptied, consider deleting it. */
2613 delete_page = empty_page =
2614 NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
2615
2616 /*
2617 * Check if the application turned off reverse splits. Applications
2618 * can't turn off reverse splits in off-page duplicate trees, that
2619 * space will never be reused unless the exact same key is specified.
2620 */
2621 if (delete_page &&
2622 !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_AM_REVSPLITOFF))
2623 delete_page = 0;
2624
2625 /*
2626 * We never delete the last leaf page. (Not really true -- we delete
2627 * the last leaf page of off-page duplicate trees, but that's handled
2628 * by our caller, not down here.)
2629 */
2630 if (delete_page && cp->pgno == cp->root)
2631 delete_page = 0;
2632
2633 /*
2634 * To delete a leaf page other than an empty root page, we need a
2635 * copy of a key from the page. Use the 0th page index since it's
2636 * the last key the page held.
2637 *
2638 * !!!
2639 * Note that because __bam_c_physdel is always called from a cursor
2640 * close, it should be safe to use the cursor's own "my_rkey" memory
2641 * to temporarily hold this key. We shouldn't own any returned-data
2642 * memory of interest--if we do, we're in trouble anyway.
2643 */
2644 if (delete_page)
2645 if ((ret = __db_ret(dbp, cp->page,
2646 0, &key, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2647 return (ret);
2648
2649 /*
2650 * Delete the items. If page isn't empty, we adjust the cursors.
2651 *
2652 * !!!
2653 * The following operations to delete a page may deadlock. The easy
2654 * scenario is if we're deleting an item because we're closing cursors
2655 * because we've already deadlocked and want to call txn->abort. If
2656 * we fail due to deadlock, we'll leave a locked, possibly empty page
2657 * in the tree, which won't be empty long because we'll undo the delete
2658 * when we undo the transaction's modifications.
2659 *
2660 * !!!
2661 * Delete the key item first, otherwise the on-page duplicate checks
2662 * in __bam_ditem() won't work!
2663 */
2664 if (TYPE(cp->page) == P_LBTREE) {
2665 if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2666 return (ret);
2667 if (!empty_page)
2668 if ((ret = __bam_ca_di(dbc,
2669 PGNO(cp->page), cp->indx, -1)) != 0)
2670 return (ret);
2671 }
2672 if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2673 return (ret);
2674
2675 /* Clear the deleted flag, the item is gone. */
2676 F_CLR(cp, C_DELETED);
2677
2678 if (!empty_page)
2679 if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0)
2680 return (ret);
2681
2682 /* If we're not going to try and delete the page, we're done. */
2683 if (!delete_page)
2684 return (0);
2685
2686 /*
2687 * Call __bam_search to reacquire the empty leaf page, but this time
2688 * get both the leaf page and it's parent, locked. Jump back up the
2689 * tree, until we have the top pair of pages that we want to delete.
2690 * Once we have the top page that we want to delete locked, lock the
2691 * underlying pages and check to make sure they're still empty. If
2692 * they are, delete them.
2693 */
2694 for (level = LEAFLEVEL;; ++level) {
2695 /* Acquire a page and its parent, locked. */
2696 if ((ret = __bam_search(dbc, PGNO_INVALID,
2697 &key, S_WRPAIR, level, NULL, &exact)) != 0)
2698 return (ret);
2699
2700 /*
2701 * If we reach the root or the parent page isn't going to be
2702 * empty when we delete one record, stop.
2703 */
2704 h = cp->csp[-1].page;
2705 if (h->pgno == cp->root || NUM_ENT(h) != 1)
2706 break;
2707
2708 /* Discard the stack, retaining no locks. */
2709 (void)__bam_stkrel(dbc, STK_NOLOCK);
2710 }
2711
2712 /*
2713 * Move the stack pointer one after the last entry, we may be about
2714 * to push more items onto the page stack.
2715 */
2716 ++cp->csp;
2717
2718 /*
2719 * cp->csp[-2].page is now the parent page, which we may or may not be
2720 * going to delete, and cp->csp[-1].page is the first page we know we
2721 * are going to delete. Walk down the chain of pages, acquiring pages
2722 * until we've acquired a leaf page. Generally, this shouldn't happen;
2723 * we should only see a single internal page with one item and a single
2724 * leaf page with no items. The scenario where we could see something
2725 * else is if reverse splits were turned off for awhile and then turned
2726 * back on. That could result in all sorts of strangeness, e.g., empty
2727 * pages in the tree, trees that looked like linked lists, and so on.
2728 *
2729 * !!!
2730 * Sheer paranoia: if we find any pages that aren't going to be emptied
2731 * by the delete, someone else added an item while we were walking the
2732 * tree, and we discontinue the delete. Shouldn't be possible, but we
2733 * check regardless.
2734 */
2735 for (h = cp->csp[-1].page;;) {
2736 if (ISLEAF(h)) {
2737 if (NUM_ENT(h) != 0)
2738 break;
2739 break;
2740 } else
2741 if (NUM_ENT(h) != 1)
2742 break;
2743
2744 /*
2745 * Get the next page, write lock it and push it onto the stack.
2746 * We know it's index 0, because it can only have one element.
2747 */
2748 switch (TYPE(h)) {
2749 case P_IBTREE:
2750 pgno = GET_BINTERNAL(dbp, h, 0)->pgno;
2751 break;
2752 case P_IRECNO:
2753 pgno = GET_RINTERNAL(dbp, h, 0)->pgno;
2754 break;
2755 default:
2756 return (__db_pgfmt(dbp->dbenv, PGNO(h)));
2757 }
2758
2759 if ((ret =
2760 __db_lget(dbc, 0, pgno, DB_LOCK_WRITE, 0, &lock)) != 0)
2761 break;
2762 if ((ret = __memp_fget(mpf, &pgno, 0, &h)) != 0)
2763 break;
2764 BT_STK_PUSH(dbp->dbenv, cp, h, 0, lock, DB_LOCK_WRITE, ret);
2765 if (ret != 0)
2766 break;
2767 }
2768
2769 /* Adjust the cursor stack to reference the last page on the stack. */
2770 BT_STK_POP(cp);
2771
2772 /*
2773 * If everything worked, delete the stack, otherwise, release the
2774 * stack and page locks without further damage.
2775 */
2776 if (ret == 0)
2777 DISCARD_CUR(dbc, ret);
2778 if (ret == 0)
2779 ret = __bam_dpages(dbc, cp->sp);
2780 else
2781 (void)__bam_stkrel(dbc, 0);
2782
2783 return (ret);
2784 }
2785
2786 /*
2787 * __bam_c_getstack --
2788 * Acquire a full stack for a cursor.
2789 */
2790 static int
2791 __bam_c_getstack(dbc)
2792 DBC *dbc;
2793 {
2794 BTREE_CURSOR *cp;
2795 DB *dbp;
2796 DBT dbt;
2797 DB_MPOOLFILE *mpf;
2798 PAGE *h;
2799 int exact, ret, t_ret;
2800
2801 dbp = dbc->dbp;
2802 mpf = dbp->mpf;
2803 cp = (BTREE_CURSOR *)dbc->internal;
2804
2805 /*
2806 * Get the page with the current item on it. The caller of this
2807 * routine has to already hold a read lock on the page, so there
2808 * is no additional lock to acquire.
2809 */
2810 if ((ret = __memp_fget(mpf, &cp->pgno, 0, &h)) != 0)
2811 return (ret);
2812
2813 /* Get a copy of a key from the page. */
2814 memset(&dbt, 0, sizeof(DBT));
2815 if ((ret = __db_ret(dbp,
2816 h, 0, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2817 goto err;
2818
2819 /* Get a write-locked stack for the page. */
2820 exact = 0;
2821 ret = __bam_search(dbc, PGNO_INVALID,
2822 &dbt, S_KEYFIRST, 1, NULL, &exact);
2823
2824 err: /* Discard the key and the page. */
2825 if ((t_ret = __memp_fput(mpf, h, 0)) != 0 && ret == 0)
2826 ret = t_ret;
2827
2828 return (ret);
2829 }
2830
2831 /*
2832 * __bam_isopd --
2833 * Return if the cursor references an off-page duplicate tree via its
2834 * page number.
2835 */
2836 static int
2837 __bam_isopd(dbc, pgnop)
2838 DBC *dbc;
2839 db_pgno_t *pgnop;
2840 {
2841 BOVERFLOW *bo;
2842
2843 if (TYPE(dbc->internal->page) != P_LBTREE)
2844 return (0);
2845
2846 bo = GET_BOVERFLOW(dbc->dbp,
2847 dbc->internal->page, dbc->internal->indx + O_INDX);
2848 if (B_TYPE(bo->type) == B_DUPLICATE) {
2849 *pgnop = bo->pgno;
2850 return (1);
2851 }
2852 return (0);
2853 }

webmaster AT resiprocate DOT org
ViewVC Help
Powered by ViewVC 1.1.27