mariadb/newbrt/ule.c
Rich Prohaska 68dc93d09b fix some linux icc issues close[t:1975]
git-svn-id: file:///svn/toku/tokudb@14305 c7de825b-a66e-492c-adef-691d508d4ae1
2013-04-16 23:58:00 -04:00

1508 lines
48 KiB
C

/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
// Purpose of this file is to handle all modifications and queries to the database
// at the level of leafentry.
//
// ule = Unpacked Leaf Entry
//
// This design unpacks the leafentry into a convenient format, performs all work
// on the unpacked form, then repacks the leafentry into its compact format.
//
// See design documentation for nested transactions at
// TokuWiki/Imp/TransactionsOverview.
#include <toku_portability.h>
#include "brttypes.h"
#include "brt-internal.h"
// Sorry:
#include "mempool.h"
#include "omt.h"
#include "leafentry.h"
#include "xids.h"
#include "brt_msg.h"
#include "ule.h"
///////////////////////////////////////////////////////////////////////////////////
//
// Question: Can any software outside this file modify or read a leafentry?
// If so, is it worthwhile to put it all here?
//
// There are two entries, one each for modification and query:
// apply_msg_to_leafentry() performs all inserts/deletes/aborts
// do_implicit_promotions_query()
//
//
//
//
//This is what we use to initialize uxrs[0] in a new unpacked leafentry.
const UXR_S committed_delete = {
.type = XR_DELETE,
.vallen = 0,
.xid = 0,
.valp = NULL
}; // static allocation of uxr with type set to committed delete and xid = 0
// Local functions:
static void msg_init_empty_ule(ULE ule, BRT_MSG msg);
static void msg_modify_ule(ULE ule, BRT_MSG msg);
static void ule_init_empty_ule(ULE ule, u_int32_t keylen, void * keyp);
static void ule_do_implicit_promotions(ULE ule, XIDS xids);
static void ule_promote_innermost_to_index(ULE ule, u_int8_t index);
static void ule_apply_insert(ULE ule, XIDS xids, u_int32_t vallen, void * valp);
static void ule_apply_delete(ULE ule, XIDS xids);
static void ule_prepare_for_new_uxr(ULE ule, XIDS xids);
static void ule_apply_abort(ULE ule, XIDS xids);
static void ule_apply_commit(ULE ule, XIDS xids);
static void ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp);
static void ule_push_delete_uxr(ULE ule, TXNID xid);
static void ule_push_placeholder_uxr(ULE ule, TXNID xid);
static UXR ule_get_innermost_uxr(ULE ule);
static UXR ule_get_first_empty_uxr(ULE ule);
static void ule_remove_innermost_uxr(ULE ule);
static TXNID ule_get_innermost_xid(ULE ule);
static TXNID ule_get_xid(ULE ule, u_int8_t index);
static void ule_remove_innermost_placeholders(ULE ule);
static void ule_add_placeholders(ULE ule, XIDS xids);
static inline BOOL uxr_type_is_insert(u_int8_t type);
static inline BOOL uxr_type_is_delete(u_int8_t type);
static inline BOOL uxr_type_is_placeholder(u_int8_t type);
static inline BOOL uxr_is_insert(UXR uxr);
static inline BOOL uxr_is_delete(UXR uxr);
static inline BOOL uxr_is_placeholder(UXR uxr);
static void *
le_malloc(OMT omt, struct mempool *mp, size_t size, void **maybe_free)
{
if (omt)
return mempool_malloc_from_omt(omt, mp, size, maybe_free);
else
return toku_malloc(size);
}
///////////// end scaffolding/upgrade
///////////// ENDTEMP ENDTEMP ENDTEMP ENDTEMP
/////////////////////////////////////////////////////////////////////////////////
// This is the big enchilada. (Bring Tums.) Note that this level of abstraction
// has no knowledge of the inner structure of either leafentry or msg. It makes
// calls into the next lower layer (msg_xxx) which handles messages.
//
// NOTE: This is the only function (at least in this body of code) that modifies
// a leafentry.
//
// Return 0 if ??? (looking at original code, it seems that it always returns 0).
// ??? How to inform caller that leafentry is to be destroyed?
//
// Temporarily declared as static until we are ready to remove wrapper apply_cmd_to_leaf().
//
int
apply_msg_to_leafentry(BRT_MSG msg, // message to apply to leafentry
LEAFENTRY old_leafentry, // NULL if there was no stored data.
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY *new_leafentry_p,
OMT omt,
struct mempool *mp,
void **maybe_free) {
ULE_S ule;
int rval;
if (old_leafentry == NULL) // if leafentry does not exist ...
msg_init_empty_ule(&ule, msg); // ... create empty unpacked leaf entry
else
le_unpack(&ule, old_leafentry); // otherwise unpack leafentry
msg_modify_ule(&ule, msg); // modify unpacked leafentry
rval = le_pack(&ule, // create packed leafentry
new_leafentry_memorysize,
new_leafentry_disksize,
new_leafentry_p,
omt, mp, maybe_free);
return rval;
}
/////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (msg_xxx)
// knows the accessors of msg, but not of leafentry or unpacked leaf entry.
// It makes calls into the lower layer (le_xxx) which handles leafentries.
// Purpose is to init the ule with given key and no transaction records
//
static void
msg_init_empty_ule(ULE ule, BRT_MSG msg) {
u_int32_t keylen = brt_msg_get_keylen(msg);
void *keyp = brt_msg_get_key(msg);
ule_init_empty_ule(ule, keylen, keyp);
}
// Purpose is to modify the unpacked leafentry in our private workspace.
//
static void
msg_modify_ule(ULE ule, BRT_MSG msg) {
XIDS xids = brt_msg_get_xids(msg);
assert(xids_get_num_xids(xids) <= MAX_TRANSACTION_RECORDS);
ule_do_implicit_promotions(ule, xids);
brt_msg_type type = brt_msg_get_type(msg);
switch (type) {
case BRT_INSERT: ;
u_int32_t vallen = brt_msg_get_vallen(msg);
void * valp = brt_msg_get_val(msg);
ule_apply_insert(ule, xids, vallen, valp);
break;
case BRT_DELETE_ANY:
case BRT_DELETE_BOTH:
ule_apply_delete(ule, xids);
break;
case BRT_ABORT_ANY:
case BRT_ABORT_BOTH:
ule_apply_abort(ule, xids);
break;
case BRT_COMMIT_ANY:
case BRT_COMMIT_BOTH:
ule_apply_commit(ule, xids);
break;
default:
assert(FALSE /* illegal BRT_MSG.type */);
break;
}
}
/////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (le_xxx) understands the structure of the leafentry
// and of the unpacked leafentry. It is the only layer that understands the
// structure of leafentry. It has no knowledge of any other data structures.
//
// There are two formats for a packed leaf entry, indicated by the number of
// transaction records:
//
// No uncommitted transactions:
// num = 1 (one byte)
// keylen (4 bytes)
// vallen (4 bytes)
// key (keylen bytes)
// val (vallen bytes)
//
// At least one uncommitted transaction (maybe a committed value as well):
//
// num > 1
// keylen
// vallen of innermost insert
// type of innermost transaction record
// xid of outermost uncommitted transaction
// key
// val of innermost insert
// records excluding extracted data above
// first (innermost) record is missing the type (above)
// innermost insert record is missing the vallen and val
// outermost uncommitted record is missing xid
// outermost record (always committed) is missing xid (implied 0)
// default record:
// type = XR_INSERT or type = XR_PLACEHOLDER or XR_DELETE
// xid xid
// vallen
// val
//
//
#if 0
#if TOKU_WINDOWS
#pragma pack(push, 1)
#endif
//TODO: #1125 Add tests to verify ALL offsets (to verify we used 'pack' right).
// May need to add extra __attribute__((__packed__)) attributes within the definition
struct __attribute__ ((__packed__)) leafentry {
u_int8_t num_xrs;
u_int32_t keylen;
u_int32_t innermost_inserted_vallen;
union {
struct leafentry_committed {
u_int8_t key_val[0]; //Actual key, then actual val
} comm;
struct leafentry_provisional {
u_int8_t innermost_type;
TXNID xid_outermost_uncommitted;
u_int8_t key_val_xrs[]; //Actual key,
//then actual innermost inserted val,
//then transaction records.
} prov;
} u;
};
#if TOKU_WINDOWS
#pragma pack(pop)
#endif
#endif
// Purpose of le_unpack() is to populate our private workspace with the contents of the given le.
void
le_unpack(ULE ule, LEAFENTRY le) {
//Read num_uxrs
ule->num_uxrs = le->num_xrs;
assert(ule->num_uxrs > 0);
//Read the keylen
ule->keylen = toku_dtoh32(le->keylen);
//Read the vallen of innermost insert
u_int32_t vallen_of_innermost_insert = toku_dtoh32(le->innermost_inserted_vallen);
u_int8_t *p;
if (ule->num_uxrs == 1) {
//Unpack a 'committed leafentry' (No uncommitted transactions exist)
ule->keyp = le->u.comm.key_val;
ule->uxrs[0].type = XR_INSERT; //Must be or the leafentry would not exist
ule->uxrs[0].vallen = vallen_of_innermost_insert;
ule->uxrs[0].valp = &le->u.comm.key_val[ule->keylen];
ule->uxrs[0].xid = 0; //Required.
//Set p to immediately after leafentry
p = &le->u.comm.key_val[ule->keylen + vallen_of_innermost_insert];
}
else {
//Unpack a 'provisional leafentry' (Uncommitted transactions exist)
//Read in type.
u_int8_t innermost_type = le->u.prov.innermost_type;
assert(!uxr_type_is_placeholder(innermost_type));
//Read in xid
TXNID xid_outermost_uncommitted = toku_dtoh64(le->u.prov.xid_outermost_uncommitted);
//Read pointer to key
ule->keyp = le->u.prov.key_val_xrs;
//Read pointer to innermost inserted val (immediately after key)
u_int8_t *valp_of_innermost_insert = &le->u.prov.key_val_xrs[ule->keylen];
//Point p to immediately after 'header'
p = &le->u.prov.key_val_xrs[ule->keylen + vallen_of_innermost_insert];
BOOL found_innermost_insert = FALSE;
int i; //Index in ULE.uxrs[]
//Loop inner to outer
for (i = ule->num_uxrs - 1; i >= 0; i--) {
UXR uxr = &ule->uxrs[i];
//Innermost's type is in header.
if (i < ule->num_uxrs - 1) {
//Not innermost, so load the type.
uxr->type = *p;
p += 1;
}
else {
//Innermost, load the type previously read from header
uxr->type = innermost_type;
}
//Committed txn id is implicit (0). (i==0)
//Outermost uncommitted txnid is stored in header. (i==1)
if (i > 1) {
//Not committed nor outermost uncommitted, so load the xid.
uxr->xid = toku_dtoh64(*(TXNID*)p);
p += 8;
}
else if (i == 1) {
//Outermost uncommitted, load the xid previously read from header
uxr->xid = xid_outermost_uncommitted;
}
else {
// i == 0, committed entry
uxr->xid = 0;
}
if (uxr_is_insert(uxr)) {
if (found_innermost_insert) {
//Not the innermost insert. Load vallen/valp
uxr->vallen = toku_dtoh32(*(u_int32_t*)p);
p += 4;
uxr->valp = p;
p += uxr->vallen;
}
else {
//Innermost insert, load the vallen/valp previously read from header
uxr->vallen = vallen_of_innermost_insert;
uxr->valp = valp_of_innermost_insert;
found_innermost_insert = TRUE;
}
}
}
assert(found_innermost_insert);
}
#if ULE_DEBUG
size_t memsize = le_memsize_from_ule(ule);
assert(p == ((u_int8_t*)le) + memsize);
#endif
}
// Purpose is to return a newly allocated leaf entry in packed format, or
// return null if leaf entry should be destroyed (if no transaction records
// are for inserts).
// Transaction records in packed le are stored inner to outer (first xr is innermost),
// with some information extracted out of the transaction records into the header.
// Transaction records in ule are stored outer to inner (uxr[0] is outermost).
int
le_pack(ULE ule, // data to be packed into new leafentry
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY * const new_leafentry_p, // this is what this function creates
OMT omt,
struct mempool *mp,
void **maybe_free) {
int rval;
u_int8_t index_of_innermost_insert;
void *valp_innermost_insert = NULL;
u_int32_t vallen_innermost_insert;
{
//If there are no 'insert' entries, return NO leafentry.
//Loop inner to outer searching for innermost insert.
//uxrs[0] is outermost (committed)
int i;
for (i = ule->num_uxrs - 1; i >= 0; i--) {
if (uxr_is_insert(&ule->uxrs[i])) {
index_of_innermost_insert = (u_int8_t) i;
vallen_innermost_insert = ule->uxrs[i].vallen;
valp_innermost_insert = ule->uxrs[i].valp;
goto found_insert;
}
}
*new_leafentry_p = NULL;
rval = 0;
goto cleanup;
}
found_insert:;
size_t memsize = le_memsize_from_ule(ule);
LEAFENTRY new_leafentry = le_malloc(omt, mp, memsize, maybe_free);
if (new_leafentry==NULL) {
rval = ENOMEM;
goto cleanup;
}
//Universal data
new_leafentry->num_xrs = ule->num_uxrs;
new_leafentry->keylen = toku_htod32(ule->keylen);
new_leafentry->innermost_inserted_vallen = toku_htod32(vallen_innermost_insert);
u_int8_t *p;
//Type (committed/provisional) specific data
if (ule->num_uxrs == 1) {
//Pack a 'committed leafentry' (No uncommitted transactions exist)
//Store actual key.
memcpy(new_leafentry->u.comm.key_val, ule->keyp, ule->keylen);
//Store actual val of innermost insert immediately after actual key
memcpy(&new_leafentry->u.comm.key_val[ule->keylen],
valp_innermost_insert,
vallen_innermost_insert);
//Set p to after leafentry
p = &new_leafentry->u.comm.key_val[ule->keylen + vallen_innermost_insert];
}
else {
//Pack a 'provisional leafentry' (Uncommitted transactions exist)
//Store the type of the innermost transaction record
new_leafentry->u.prov.innermost_type = ule_get_innermost_uxr(ule)->type;
//uxrs[0] is the committed, uxrs[1] is the outermost non-committed
//Store the outermost non-committed xid
new_leafentry->u.prov.xid_outermost_uncommitted = toku_htod64(ule->uxrs[1].xid);
//Store actual key.
memcpy(new_leafentry->u.prov.key_val_xrs, ule->keyp, ule->keylen);
//Store actual val of innermost insert immediately after actual key
memcpy(&new_leafentry->u.prov.key_val_xrs[ule->keylen],
valp_innermost_insert,
vallen_innermost_insert);
//Set p to after 'header'
p = &new_leafentry->u.prov.key_val_xrs[ule->keylen + vallen_innermost_insert];
int i; //index into ULE
//Loop inner to outer
for (i = ule->num_uxrs - 1; i >= 0; i--) {
UXR uxr = &ule->uxrs[i];
//Innermost's type is in header.
if (i < ule->num_uxrs - 1) {
//Not innermost, so record the type.
*p = uxr->type;
p += 1;
}
//Committed txn id is implicit (0). (i==0)
//Outermost uncommitted txnid is stored in header. (i==1)
if (i > 1) {
//Not committed nor outermost uncommitted, so record the xid.
*((TXNID*)p) = toku_htod64(uxr->xid);
p += 8;
}
//Innermost insert's length and value are stored in header.
if (uxr_is_insert(uxr) && i != index_of_innermost_insert) {
//Is an insert, and not the innermost insert, so store length/val
*((u_int32_t*)p) = toku_htod32(uxr->vallen);
p += 4;
memcpy(p, uxr->valp, uxr->vallen); //Store actual val
p += uxr->vallen;
}
}
}
//p points to first unused byte after packed leafentry
size_t bytes_written = (size_t)p - (size_t)new_leafentry;
assert(bytes_written == memsize);
#if ULE_DEBUG
if (omt) { //Disable recursive debugging.
size_t memsize_verify = leafentry_memsize(new_leafentry);
assert(memsize_verify == memsize);
ULE_S ule_tmp;
le_unpack(&ule_tmp, new_leafentry);
memsize_verify = le_memsize_from_ule(&ule_tmp);
assert(memsize_verify == memsize);
//Debugging code inside le_unpack will repack and verify it is the same.
LEAFENTRY le_copy;
int r_tmp = le_pack(&ule_tmp, &memsize_verify, &memsize_verify,
&le_copy, NULL, NULL, NULL);
assert(r_tmp==0);
assert(memsize_verify == memsize);
assert(memcmp(new_leafentry, le_copy, memsize)==0);
toku_free(le_copy);
}
#endif
*new_leafentry_p = (LEAFENTRY)new_leafentry;
*new_leafentry_memorysize = memsize;
*new_leafentry_disksize = memsize;
rval = 0;
cleanup:
return rval;
}
//////////////////////////////////////////////////////////////////////////////////
// Following functions provide convenient access to a packed leafentry.
//Requires:
// Leafentry that ule represents should not be destroyed (is not just all deletes)
size_t
le_memsize_from_ule (ULE ule) {
assert(ule->num_uxrs);
size_t rval;
if (ule->num_uxrs == 1) {
assert(uxr_is_insert(&ule->uxrs[0]));
rval = 1 //num_uxrs
+4 //keylen
+4 //vallen
+ule->keylen //actual key
+ule->uxrs[0].vallen; //actual val
}
else {
rval = 1 //num_uxrs
+4 //keylen
+ule->keylen //actual key
+1*ule->num_uxrs //types
+8*(ule->num_uxrs-1); //txnids
u_int8_t i;
for (i = 0; i < ule->num_uxrs; i++) {
UXR uxr = &ule->uxrs[i];
if (uxr_is_insert(uxr)) {
rval += 4; //vallen
rval += uxr->vallen; //actual val
}
}
}
return rval;
}
#define LE_COMMITTED_MEMSIZE(le, keylen, vallen) \
(sizeof((le)->num_xrs) /* num_uxrs */ \
+sizeof((le)->keylen) /* keylen */ \
+sizeof((le)->innermost_inserted_vallen) /* vallen */ \
+keylen /* actual key */ \
+vallen) /* actual val */
size_t
leafentry_memsize (LEAFENTRY le) {
size_t rval = 0;
//Read num_uxrs
u_int8_t num_uxrs = le->num_xrs;
assert(num_uxrs > 0);
//Read the keylen
u_int32_t keylen = toku_dtoh32(le->keylen);
//Read the vallen of innermost insert
u_int32_t vallen_of_innermost_insert = toku_dtoh32(le->innermost_inserted_vallen);
if (num_uxrs == 1) {
//Committed version (no uncommitted records)
rval = LE_COMMITTED_MEMSIZE(le, keylen, vallen_of_innermost_insert);
}
else {
//A 'provisional leafentry' (Uncommitted transactions exist)
//Read in type.
u_int8_t innermost_type = le->u.prov.innermost_type;
assert(!uxr_type_is_placeholder(innermost_type));
//Set p to immediately after key,val (begginning of transaction records)
u_int8_t *p = &le->u.prov.key_val_xrs[keylen + vallen_of_innermost_insert];
BOOL found_innermost_insert = FALSE;
int i; //would be index in ULE.uxrs[] were we to unpack
//Loop inner to outer
UXR_S current_uxr;
UXR uxr = &current_uxr;
for (i = num_uxrs - 1; i >= 0; i--) {
//Innermost's type is in header.
if (i < num_uxrs - 1) {
//Not innermost, so load the type.
uxr->type = *p;
p += 1;
}
else {
//Innermost, load the type previously read from header
uxr->type = innermost_type;
}
//Committed txn id is implicit (0). (i==0)
//Outermost uncommitted txnid is stored in header. (i==1)
if (i > 1) {
//Not committed nor outermost uncommitted, so load the xid.
p += 8;
}
if (uxr_is_insert(uxr)) {
if (found_innermost_insert) {
//Not the innermost insert. Load vallen/valp
uxr->vallen = toku_dtoh32(*(u_int32_t*)p);
p += 4;
p += uxr->vallen;
}
else
found_innermost_insert = TRUE;
}
}
assert(found_innermost_insert);
rval = (size_t)p - (size_t)le;
}
#if ULE_DEBUG
ULE_S ule;
le_unpack(&ule, le);
size_t slow_rval = le_memsize_from_ule(&ule);
assert(slow_rval == rval);
#endif
return rval;
}
size_t
leafentry_disksize (LEAFENTRY le) {
return leafentry_memsize(le);
}
// le is normally immutable. This is the only exception.
void
le_full_promotion(LEAFENTRY le,
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize) {
#if ULE_DEBUG
// Create a new le ("slow_le") using normal commit message for comparison.
// Creation of slow_le must be done first, because le is being modified.
assert(le);
assert(le->num_xrs > 1); //Not committed
assert(!le_is_provdel(le));
TXNID outermost_uncommitted_xid = le_outermost_uncommitted_xid(le);
assert(outermost_uncommitted_xid != 0);
size_t old_memsize = leafentry_memsize(le);
u_int32_t old_keylen;
u_int32_t old_vallen;
void *old_key = le_key_and_len(le, &old_keylen);
void *old_val = le_innermost_inserted_val_and_len(le, &old_vallen);
assert(old_key == le_latest_key(le));
assert(old_keylen == le_latest_keylen(le));
assert(old_val == le_latest_val(le));
assert(old_vallen == le_latest_vallen(le));
//Save copies for verification.
old_key = toku_memdup(old_key, old_keylen);
assert(old_key);
old_val = toku_memdup(old_val, old_vallen);
assert(old_val);
BRT_MSG_S slow_full_promotion_msg = {
.type = BRT_COMMIT_ANY,
.u.id = {
.key = NULL,
.val = NULL,
}
};
int r_xids = xids_create_child(xids_get_root_xids(),
&slow_full_promotion_msg.xids,
outermost_uncommitted_xid);
assert(r_xids==0);
size_t slow_new_memsize;
size_t slow_new_disksize;
LEAFENTRY slow_le;
int r_apply = apply_msg_to_leafentry(&slow_full_promotion_msg,
le,
&slow_new_memsize, &slow_new_disksize,
&slow_le,
NULL, NULL, NULL);
assert(r_apply == 0);
assert(slow_new_memsize == slow_new_disksize);
assert(slow_new_memsize < old_memsize);
assert(slow_le);
#endif
//Save keylen for later use.
u_int32_t keylen = le_keylen(le);
//Save innermost inserted vallen for later use.
u_int32_t vallen = le_innermost_inserted_vallen(le);
//Set as committed.
le->num_xrs = 1;
//Keylen is unchanged but we need to extract it.
//Innermost inserted vallen is unchanged but we need to extract it.
//Move key and value using memmove. memcpy does not support overlapping memory.
//Move the key
memmove(le->u.comm.key_val, le->u.prov.key_val_xrs, keylen);
//Move the val
memmove(&le->u.comm.key_val[keylen], &le->u.prov.key_val_xrs[keylen], vallen);
size_t new_memsize = LE_COMMITTED_MEMSIZE(le, keylen, vallen);
*new_leafentry_memorysize = new_memsize;
*new_leafentry_disksize = new_memsize;
#if ULE_DEBUG
// now compare le with "slow_le" created via normal commit message.
assert(*new_leafentry_memorysize == slow_new_memsize); //Size same
assert(*new_leafentry_disksize == slow_new_disksize); //Size same
assert(memcmp(le, slow_le, slow_new_memsize) == 0); //Bitwise the same.
assert(!le_is_provdel(le));
assert(le_outermost_uncommitted_xid(le) == 0);
//Verify key(len), val(len) unchanged.
u_int32_t new_keylen;
u_int32_t new_vallen;
void *new_key = le_key_and_len(le, &new_keylen);
void *new_val = le_innermost_inserted_val_and_len(le, &new_vallen);
assert(new_key == le_latest_key(le));
assert(new_keylen == le_latest_keylen(le));
assert(new_val == le_latest_val(le));
assert(new_vallen == le_latest_vallen(le));
assert(new_keylen == old_keylen);
assert(new_vallen == old_vallen);
assert(memcmp(new_key, old_key, old_keylen) == 0);
assert(memcmp(new_val, old_val, old_vallen) == 0);
xids_destroy(&slow_full_promotion_msg.xids);
toku_free(slow_le);
toku_free(old_key);
toku_free(old_val);
#endif
}
int le_is_provdel(LEAFENTRY le) {
int rval;
u_int8_t num_xrs = le->num_xrs;
if (num_xrs == 1)
rval = 0;
else
rval = uxr_type_is_delete(le->u.prov.innermost_type);
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
int slow_rval = uxr_is_delete(uxr);
assert((rval==0) == (slow_rval==0));
#endif
return rval;
}
//If le_is_provdel, return (NULL,0)
//Else, return (key,keylen)
void*
le_latest_key_and_len (LEAFENTRY le, u_int32_t *len) {
u_int8_t num_xrs = le->num_xrs;
void *keyp;
*len = toku_dtoh32(le->keylen);
if (num_xrs == 1)
keyp = le->u.comm.key_val;
else {
keyp = le->u.prov.key_val_xrs;
if (uxr_type_is_delete(le->u.prov.innermost_type)) {
keyp = NULL;
*len = 0;
}
}
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
void *slow_keyp;
u_int32_t slow_len;
if (uxr_is_insert(uxr)) {
slow_keyp = ule.keyp;
slow_len = ule.keylen;
}
else {
slow_keyp = NULL;
slow_len = 0;
}
assert(slow_keyp == le_latest_key(le));
assert(slow_len == le_latest_keylen(le));
assert(keyp==slow_keyp);
assert(*len==slow_len);
#endif
return keyp;
}
void*
le_latest_key (LEAFENTRY le) {
u_int8_t num_xrs = le->num_xrs;
void *rval;
if (num_xrs == 1)
rval = le->u.comm.key_val;
else {
rval = le->u.prov.key_val_xrs;
if (uxr_type_is_delete(le->u.prov.innermost_type))
rval = NULL;
}
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
void *slow_rval;
if (uxr_is_insert(uxr))
slow_rval = ule.keyp;
else
slow_rval = NULL;
assert(rval==slow_rval);
#endif
return rval;
}
u_int32_t
le_latest_keylen (LEAFENTRY le) {
u_int8_t num_xrs = le->num_xrs;
u_int32_t rval = toku_dtoh32(le->keylen);
if (num_xrs > 1 && uxr_type_is_delete(le->u.prov.innermost_type))
rval = 0;
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
u_int32_t slow_rval;
if (uxr_is_insert(uxr))
slow_rval = ule.keylen;
else
slow_rval = 0;
assert(rval==slow_rval);
#endif
return rval;
}
void*
le_latest_val_and_len (LEAFENTRY le, u_int32_t *len) {
u_int8_t num_xrs = le->num_xrs;
void *valp;
u_int32_t keylen = toku_dtoh32(le->keylen);
*len = toku_dtoh32(le->innermost_inserted_vallen);
if (num_xrs == 1)
valp = &le->u.comm.key_val[keylen];
else {
valp = &le->u.prov.key_val_xrs[keylen];
if (uxr_type_is_delete(le->u.prov.innermost_type)) {
valp = NULL;
*len = 0;
}
}
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
void *slow_valp;
u_int32_t slow_len;
if (uxr_is_insert(uxr)) {
slow_valp = uxr->valp;
slow_len = uxr->vallen;
}
else {
slow_valp = NULL;
slow_len = 0;
}
assert(slow_valp == le_latest_val(le));
assert(slow_len == le_latest_vallen(le));
assert(valp==slow_valp);
assert(*len==slow_len);
#endif
return valp;
}
void*
le_latest_val (LEAFENTRY le) {
u_int8_t num_xrs = le->num_xrs;
void *rval;
u_int32_t keylen = toku_dtoh32(le->keylen);
if (num_xrs == 1)
rval = &le->u.comm.key_val[keylen];
else {
rval = &le->u.prov.key_val_xrs[keylen];
if (uxr_type_is_delete(le->u.prov.innermost_type))
rval = NULL;
}
#if ULE_DEBUG
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
void *slow_rval;
if (uxr_is_insert(uxr))
slow_rval = uxr->valp;
else
slow_rval = NULL;
assert(rval==slow_rval);
#endif
return rval;
}
u_int32_t
le_latest_vallen (LEAFENTRY le) {
u_int8_t num_xrs = le->num_xrs;
u_int32_t rval = toku_dtoh32(le->innermost_inserted_vallen);
if (num_xrs > 1 && uxr_type_is_delete(le->u.prov.innermost_type))
rval = 0;
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_innermost_uxr(&ule);
u_int32_t slow_rval;
if (uxr_is_insert(uxr))
slow_rval = uxr->vallen;
else
slow_rval = 0;
assert(rval==slow_rval);
#endif
return rval;
}
//Return key and keylen unconditionally
void*
le_key_and_len (LEAFENTRY le, u_int32_t *len) {
u_int8_t num_xrs = le->num_xrs;
*len = toku_dtoh32(le->keylen);
void *keyp;
if (num_xrs == 1)
keyp = le->u.comm.key_val;
else
keyp = le->u.prov.key_val_xrs;
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
void *slow_keyp;
u_int32_t slow_len;
slow_keyp = ule.keyp;
slow_len = ule.keylen;
assert(slow_keyp == le_key(le));
assert(slow_len == le_keylen(le));
assert(keyp==slow_keyp);
assert(*len==slow_len);
#endif
return keyp;
}
void*
le_key (LEAFENTRY le) {
u_int8_t num_xrs = le->num_xrs;
void *rval;
if (num_xrs == 1)
rval = le->u.comm.key_val;
else
rval = le->u.prov.key_val_xrs;
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
void *slow_rval = ule.keyp;
assert(rval==slow_rval);
#endif
return rval;
}
u_int32_t
le_keylen (LEAFENTRY le) {
u_int32_t rval = toku_dtoh32(le->keylen);
#if ULE_DEBUG
ULE_S ule;
le_unpack(&ule, le);
u_int32_t slow_rval = ule.keylen;
assert(rval==slow_rval);
#endif
return rval;
}
void*
le_innermost_inserted_val_and_len (LEAFENTRY le, u_int32_t *len) {
u_int8_t num_xrs = le->num_xrs;
void *valp;
u_int32_t keylen = toku_dtoh32(le->keylen);
*len = toku_dtoh32(le->innermost_inserted_vallen);
if (num_xrs == 1)
valp = &le->u.comm.key_val[keylen];
else
valp = &le->u.prov.key_val_xrs[keylen];
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
u_int8_t i;
for (i = ule.num_uxrs; i > 0; i--) {
if (uxr_is_insert(&ule.uxrs[i-1]))
break;
}
assert(i > 0);
i--;
UXR uxr = &ule.uxrs[i];
void *slow_valp;
u_int32_t slow_len;
slow_valp = uxr->valp;
slow_len = uxr->vallen;
assert(slow_valp == le_innermost_inserted_val(le));
assert(slow_len == le_innermost_inserted_vallen(le));
assert(valp==slow_valp);
assert(*len==slow_len);
#endif
return valp;
}
void*
le_innermost_inserted_val (LEAFENTRY le) {
u_int8_t num_xrs = le->num_xrs;
void *rval;
u_int32_t keylen = toku_dtoh32(le->keylen);
if (num_xrs == 1)
rval = &le->u.comm.key_val[keylen];
else
rval = &le->u.prov.key_val_xrs[keylen];
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
u_int8_t i;
for (i = ule.num_uxrs; i > 0; i--) {
if (uxr_is_insert(&ule.uxrs[i-1]))
break;
}
assert(i > 0);
i--;
void *slow_rval = ule.uxrs[i].valp;
assert(rval==slow_rval);
#endif
return rval;
}
u_int32_t
le_innermost_inserted_vallen (LEAFENTRY le) {
u_int32_t rval = toku_dtoh32(le->innermost_inserted_vallen);
#if ULE_DEBUG
ULE_S ule;
le_unpack(&ule, le);
u_int8_t i;
for (i = ule.num_uxrs; i > 0; i--) {
if (uxr_is_insert(&ule.uxrs[i-1]))
break;
}
assert(i > 0);
i--;
u_int32_t slow_rval = ule.uxrs[i].vallen;
assert(rval==slow_rval);
#endif
return rval;
}
u_int64_t
le_outermost_uncommitted_xid (LEAFENTRY le) {
u_int8_t num_xrs = le->num_xrs;
TXNID rval;
if (num_xrs == 1)
rval = 0;
else
rval = toku_dtoh64(le->u.prov.xid_outermost_uncommitted);
#if ULE_DEBUG
assert(num_xrs);
ULE_S ule;
le_unpack(&ule, le);
TXNID slow_rval = 0;
if (ule.num_uxrs > 1)
slow_rval = ule.uxrs[1].xid;
assert(rval==slow_rval);
#endif
return rval;
}
//Optimization not required. This is a debug only function.
//Print a leafentry out in human-readable format
int
print_leafentry (FILE *outf, LEAFENTRY le) {
ULE_S ule;
le_unpack(&ule, le);
u_int8_t i;
assert(ule.num_uxrs > 0);
UXR uxr = &ule.uxrs[0];
if (!le) { printf("NULL"); return 0; }
fprintf(outf, "{key=");
toku_print_BYTESTRING(outf, ule.keylen, ule.keyp);
for (i = 0; i < ule.num_uxrs; i++) {
fprintf(outf, "\n%*s", i+1, " "); //Nested indenting
uxr = &ule.uxrs[i];
if (uxr_is_placeholder(uxr))
fprintf(outf, "P: xid=%016" PRIx64, uxr->xid);
else if (uxr_is_delete(uxr))
fprintf(outf, "D: xid=%016" PRIx64, uxr->xid);
else {
assert(uxr_is_insert(uxr));
fprintf(outf, "I: xid=%016" PRIx64 " val=", uxr->xid);
toku_print_BYTESTRING(outf, uxr->vallen, uxr->valp);
}
}
fprintf(outf, "}");
return 0;
}
/////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (ule_xxx) knows the structure of the unpacked
// leafentry and no other structure.
//
// ule constructor
// Note that transaction 0 is explicit in the ule
static void
ule_init_empty_ule(ULE ule, u_int32_t keylen, void * keyp) {
ule->keylen = keylen;
ule->keyp = keyp;
ule->num_uxrs = 1;
ule->uxrs[0] = committed_delete;
}
static inline u_int8_t
min_u8(u_int8_t a, u_int8_t b) {
u_int8_t rval = a < b ? a : b;
return rval;
}
///////////////////
// Implicit promotion logic:
//
// If the leafentry has already been promoted, there is nothing to do.
// We have two transaction stacks (one from message, one from leaf entry).
// We want to implicitly promote transactions newer than (but not including)
// the innermost common ancestor (ICA) of the two stacks of transaction ids. We
// know that this is the right thing to do because each transaction with an id
// greater (later) than the ICA must have been either committed or aborted.
// If it was aborted then we would have seen an abort message and removed the
// xid from the stack of transaction records. So any transaction still on the
// leaf entry stack must have been successfully promoted.
//
// After finding the ICA, promote transaction later than the ICA by copying
// value and type from innermost transaction record of leafentry to transaction
// record of ICA, keeping the transaction id of the ICA.
// Outermost xid is zero for both ule and xids<>
//
static void
ule_do_implicit_promotions(ULE ule, XIDS xids) {
//Optimization for (most) common case.
//No commits necessary if everything is already committed.
if (ule->num_uxrs > 1) {
u_int8_t max_index = min_u8(ule->num_uxrs, xids_get_num_xids(xids)) - 1;
u_int8_t ica_index = max_index;
u_int8_t index;
for (index = 1; index <= max_index; index++) { //xids at index 0 are defined to be equal.
TXNID current_msg_xid = xids_get_xid(xids, index);
TXNID current_ule_xid = ule_get_xid(ule, index);
if (current_msg_xid != current_ule_xid) {
//ica is innermost transaction with matching xids.
ica_index = index - 1;
break;
}
}
//If ica is the innermost uxr in the leafentry, no commits are necessary.
if (ica_index < ule->num_uxrs - 1) {
ule_promote_innermost_to_index(ule, ica_index);
}
}
}
// Purpose is to promote the value (and type) of the innermost transaction
// record to the uxr at the specified index (keeping the txnid of the uxr at
// specified index.)
static void
ule_promote_innermost_to_index(ULE ule, u_int8_t index) {
assert(ule->num_uxrs - 1 > index);
UXR old_innermost_uxr = ule_get_innermost_uxr(ule);
assert(!uxr_is_placeholder(old_innermost_uxr));
TXNID new_innermost_xid = ule->uxrs[index].xid;
ule->num_uxrs = index; //Discard old uxr at index (and everything inner)
if (uxr_is_delete(old_innermost_uxr)) {
ule_push_delete_uxr(ule, new_innermost_xid);
}
else {
ule_push_insert_uxr(ule,
new_innermost_xid,
old_innermost_uxr->vallen,
old_innermost_uxr->valp);
}
}
///////////////////
// All ule_apply_xxx operations are done after implicit promotions,
// so the innermost transaction record in the leafentry is the ICA.
//
// Purpose is to apply an insert message to this leafentry:
static void
ule_apply_insert(ULE ule, XIDS xids, u_int32_t vallen, void * valp) {
ule_prepare_for_new_uxr(ule, xids);
TXNID this_xid = xids_get_innermost_xid(xids); // xid of transaction doing this insert
ule_push_insert_uxr(ule, this_xid, vallen, valp);
}
// Purpose is to apply a delete message to this leafentry:
static void
ule_apply_delete(ULE ule, XIDS xids) {
ule_prepare_for_new_uxr(ule, xids);
TXNID this_xid = xids_get_innermost_xid(xids); // xid of transaction doing this delete
ule_push_delete_uxr(ule, this_xid);
}
// First, discard anything done earlier by this transaction.
// Then, add placeholders if necessary. This transaction may be nested within
// outer transactions that are newer than then newest (innermost) transaction in
// the leafentry. If so, record those outer transactions in the leafentry
// with placeholders.
static void
ule_prepare_for_new_uxr(ULE ule, XIDS xids) {
TXNID this_xid = xids_get_innermost_xid(xids);
if (ule_get_innermost_xid(ule) == this_xid)
ule_remove_innermost_uxr(ule);
else
ule_add_placeholders(ule, xids);
}
// Purpose is to apply an abort message to this leafentry.
// If the aborted transaction (the transaction whose xid is the innermost xid
// in the id stack passed in the message), has not modified this leafentry,
// then there is nothing to be done.
// If this transaction did modify the leafentry, then undo whatever it did (by
// removing the transaction record (uxr) and any placeholders underneath.
// Remember, the innermost uxr can only be an insert or a delete, not a placeholder.
static void
ule_apply_abort(ULE ule, XIDS xids) {
TXNID this_xid = xids_get_innermost_xid(xids); // xid of transaction doing this abort
assert(this_xid!=0);
if (ule_get_innermost_xid(ule) == this_xid) {
assert(ule->num_uxrs>1);
ule_remove_innermost_uxr(ule);
ule_remove_innermost_placeholders(ule);
}
assert(ule->num_uxrs > 0);
}
// Purpose is to apply a commit message to this leafentry.
// If the committed transaction (the transaction whose xid is the innermost xid
// in the id stack passed in the message), has not modified this leafentry,
// then there is nothing to be done.
// Also, if there are no uncommitted transaction records there is nothing to do.
// If this transaction did modify the leafentry, then promote whatever it did.
// Remember, the innermost uxr can only be an insert or a delete, not a placeholder.
void ule_apply_commit(ULE ule, XIDS xids) {
TXNID this_xid = xids_get_innermost_xid(xids); // xid of transaction committing
assert(this_xid!=0);
if (ule_get_innermost_xid(ule) == this_xid) {
//ule->uxrs[ule->num_uxrs-1] is the innermost (this transaction)
//ule->uxrs[ule->num_uxrs-2] is the 2nd innermost
assert(ule->num_uxrs > 1);
//We want to promote the innermost uxr one level out.
ule_promote_innermost_to_index(ule, ule->num_uxrs-2);
}
}
///////////////////
// Helper functions called from the functions above:
//
// Purpose is to record an insert for this transaction (and set type correctly).
static void
ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp) {
UXR uxr = ule_get_first_empty_uxr(ule);
uxr->xid = xid;
uxr->vallen = vallen;
uxr->valp = valp;
uxr->type = XR_INSERT;
ule->num_uxrs++;
}
// Purpose is to record a delete for this transaction. If this transaction
// is the root transaction, then truly delete the leafentry by marking the
// ule as empty.
static void
ule_push_delete_uxr(ULE ule, TXNID xid) {
UXR uxr = ule_get_first_empty_uxr(ule);
uxr->xid = xid;
uxr->type = XR_DELETE;
ule->num_uxrs++;
}
// Purpose is to push a placeholder on the top of the leafentry's transaction stack.
static void
ule_push_placeholder_uxr(ULE ule, TXNID xid) {
UXR uxr = ule_get_first_empty_uxr(ule);
uxr->xid = xid;
uxr->type = XR_PLACEHOLDER;
ule->num_uxrs++;
}
// Return innermost transaction record.
static UXR
ule_get_innermost_uxr(ULE ule) {
assert(ule->num_uxrs > 0);
UXR rval = &(ule->uxrs[ule->num_uxrs - 1]);
return rval;
}
// Return first empty transaction record
static UXR
ule_get_first_empty_uxr(ULE ule) {
assert(ule->num_uxrs < MAX_TRANSACTION_RECORDS);
UXR rval = &(ule->uxrs[ule->num_uxrs]);
return rval;
}
// Remove the innermost transaction (pop the leafentry's stack), undoing
// whatever the innermost transaction did.
static void
ule_remove_innermost_uxr(ULE ule) {
//It is possible to remove the committed delete at first insert.
assert(ule->num_uxrs > 0);
ule->num_uxrs--;
}
static TXNID
ule_get_innermost_xid(ULE ule) {
TXNID rval = ule_get_xid(ule, ule->num_uxrs - 1);
return rval;
}
static TXNID
ule_get_xid(ULE ule, u_int8_t index) {
assert(index < ule->num_uxrs);
TXNID rval = ule->uxrs[index].xid;
return rval;
}
// Purpose is to remove any placeholders from the top of the leaf stack (the
// innermost recorded transactions), if necessary. This function is idempotent.
// It makes no logical sense for a placeholder to be the innermost recorded
// transaction record, so placeholders at the top of the stack are not legal.
static void
ule_remove_innermost_placeholders(ULE ule) {
UXR uxr = ule_get_innermost_uxr(ule);
while (uxr_is_placeholder(uxr)) {
assert(ule->num_uxrs > 1); // outermost is committed, cannot be placeholder
ule_remove_innermost_uxr(ule);
uxr = ule_get_innermost_uxr(ule);
}
}
// Purpose is to add placeholders to the top of the leaf stack (the innermost
// recorded transactions), if necessary. This function is idempotent.
// Note, after placeholders are added, an insert or delete will be added. This
// function temporarily leaves the transaction stack in an illegal state (having
// placeholders on top).
static void
ule_add_placeholders(ULE ule, XIDS xids) {
//Placeholders can be placed on top of the committed uxr.
assert(ule->num_uxrs > 0);
TXNID ica_xid = ule_get_innermost_xid(ule); // xid of ica
TXNID this_xid = xids_get_innermost_xid(xids); // xid of this transaction
if (ica_xid != this_xid) { // if this transaction is the ICA, don't push any placeholders
u_int8_t index = xids_find_index_of_xid(xids, ica_xid) + 1; // Get index of next inner transaction after ICA
TXNID current_msg_xid = xids_get_xid(xids, index);
while (current_msg_xid != this_xid) { // Placeholder for each transaction before this transaction
ule_push_placeholder_uxr(ule, current_msg_xid);
index++;
current_msg_xid = xids_get_xid(xids, index);
}
}
}
/////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (uxr_xxx) understands uxr and nothing else.
//
static inline BOOL
uxr_type_is_insert(u_int8_t type) {
BOOL rval = (BOOL)(type == XR_INSERT);
return rval;
}
static inline BOOL
uxr_is_insert(UXR uxr) {
return uxr_type_is_insert(uxr->type);
}
static inline BOOL
uxr_type_is_delete(u_int8_t type) {
BOOL rval = (BOOL)(type == XR_DELETE);
return rval;
}
static inline BOOL
uxr_is_delete(UXR uxr) {
return uxr_type_is_delete(uxr->type);
}
static inline BOOL
uxr_type_is_placeholder(u_int8_t type) {
BOOL rval = (BOOL)(type == XR_PLACEHOLDER);
return rval;
}
static inline BOOL
uxr_is_placeholder(UXR uxr) {
return uxr_type_is_placeholder(uxr->type);
}
#ifdef IMPLICIT_PROMOTION_ON_QUERY
////////////////////////////////////////////////////////////////////////////////
// Functions here are responsible for implicit promotions on queries.
//
// Purpose is to promote any transactions in this leafentry by detecting if
// transactions that have modified it have been committed.
// During a query, the read lock for the leaf entry is not necessarily taken.
// (We use a locking regime that tests the lock after the read.)
// If a transaction unrelated to the transaction issuing the query is writing
// to this leafentry (possible because we didn't take the read lock), then that
// unrelated transaction is alive and there should be no implicit promotion.
// So any implicit promotions done during the query must be based solely on
// whether the transactions whose xids are recorded in the leafentry are still
// open. (An open transaction is one that has not committed or aborted.)
// Our logic is:
// If the innermost transaction in the leafentry is definitely open, then no
// implicit promotions are necessary (or possible). This is a fast test.
// Otherwise, scan from inner to outer to find the innermost uncommitted
// transaction. Then promote the innermost transaction to the transaction
// record of the innermost open (uncommitted) transaction.
// Transaction id of zero is always considered open for this purpose.
leafentry do_implicit_promotions_on_query(le) {
innermost_xid = le_get_innermost_xid(le);
// if innermost transaction still open, nothing to promote
if (!transaction_open(innermost_xid)) {
ule = unpack(le);
// scan outward starting with next outer transaction
for (index = ule->num_uxrs - 2; index > 0; index--) {
xid = ule_get_xid(ule, index);
if (transaction_open(xid)) break;
}
promote_innermost_to_index(ule, index);
le = le_pack(ule);
}
return le;
}
// Examine list of open transactions, return true if transaction is still open.
// Transaction zero is always open.
//
// NOTE: Old code already does implicit promotion of provdel on query,
// and that code uses some equivalent of transaction_open().
//
bool transaction_open(TXNID xid) {
rval = TRUE;
if (xid != 0) {
//TODO: Logic
}
return rval;
}
#endif
// Wrapper code to support backwards compatibility with version 10 (until we don't want it).
// These wrappers should be removed if/when we remove support for version 10 leafentries.
#include "backwards_10.h"
void
toku_upgrade_ule_init_empty_ule(ULE ule, u_int32_t keylen, void * keyp) {
ule_init_empty_ule(ule, keylen, keyp);
}
void
toku_upgrade_ule_remove_innermost_uxr(ULE ule) {
ule_remove_innermost_uxr(ule);
}
void
toku_upgrade_ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp) {
ule_push_insert_uxr(ule, xid, vallen, valp);
}
void
toku_upgrade_ule_push_delete_uxr(ULE ule, TXNID xid) {
ule_push_delete_uxr(ule, xid);
}