Monday, May 5, 2014

Investigate into nodetool cleanup in cassandra

In this article, we are going to study into cassandra 1.0.8 nodetool cleanup. From nodetool help description; Run cleanup on one or more column family. That's clearly too general to understand clearly.

So right now we will trace the code to understand cleanup operation.

Cleanup is actually a type of compaction. Trace the code down under, CompactionManager.doCleanupCompaction() which actually does cleanup actual work.
/**
* This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
* @throws IOException
*/
private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, NodeId.OneShotRenewer renewer) throws IOException
{
assert !cfs.isIndex();
Table table = cfs.table;
Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
if (ranges.isEmpty())
{
logger.info("Cleanup cannot run before a node has joined the ring");
return;
}

for (SSTableReader sstable : sstables)
{
CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs), false);
long startTime = System.currentTimeMillis();

long totalkeysWritten = 0;

int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);

SSTableWriter writer = null;
SSTableReader newSstable = null;

logger.info("Cleaning up " + sstable);
// Calculate the expected compacted filesize
long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
if (compactionFileLocation == null)
throw new IOException("disk full");

SSTableScanner scanner = sstable.getDirectScanner();
Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
List<IColumn> indexedColumnsInRow = null;

CleanupInfo ci = new CleanupInfo(sstable, scanner);
executor.beginCompaction(ci);
try
{
while (scanner.hasNext())
{
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
if (Range.isTokenInRanges(row.getKey().token, ranges))
{
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
if (compactedRow.isEmpty())
continue;
writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
writer.append(compactedRow);
totalkeysWritten++;
}
else
{
cfs.invalidateCachedRow(row.getKey());

if (!indexedColumns.isEmpty() || isCommutative)
{
if (indexedColumnsInRow != null)
indexedColumnsInRow.clear();

while (row.hasNext())
{
IColumn column = row.next();
if (column instanceof CounterColumn)
renewer.maybeRenew((CounterColumn) column);
if (indexedColumns.contains(column.name()))
{
if (indexedColumnsInRow == null)
indexedColumnsInRow = new ArrayList<IColumn>();

indexedColumnsInRow.add(column);
}
}

if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
{
// acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
Table.switchLock.readLock().lock();
try
{
cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
}
finally
{
Table.switchLock.readLock().unlock();
}
}
}
}
}
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
}
catch (Exception e)
{
if (writer != null)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
scanner.close();
executor.finishCompaction(ci);
}

List<SSTableReader> results = new ArrayList<SSTableReader>();
if (newSstable != null)
{
results.add(newSstable);

String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = System.currentTimeMillis() - startTime;
long startsize = sstable.onDiskLength();
long endsize = newSstable.onDiskLength();
double ratio = (double)endsize / (double)startsize;
logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int)(ratio*100), totalkeysWritten, dTime));
}

// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
cfs.indexManager.flushIndexesBlocking();

cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
}
}

With these method, it is very clear to us that, keys (that is the row.) that do not belong to this node will get removed. Following are points of summarization of what this method is actually done.

  • for this column family, get the range which this node is responsible for.

  • expectedRangeFileSize is half of summarization of all the input sstables file size.

  • for each sstables, a loop is done with the following tasks:



  1. check if enough disk size for the new compacted sstable.

  2. executor begin a cleanup compaction.

  3. iterate over the row in this sstable and check if the row key token is within the range this node is responsible for.

  4. if it is, get the compacted row and append this row to the SSTableWriter.

  5. if it is not, then the row will be invalidate in cache. The index created for this row will also be remove.



  • cleanup compaction is done by executor.

  • cleanup compaction infomation write to the logger.

  • flush index to disk.

  • old sstable is removed.


That's it about cassandra cleanup. If you learn something and would like to contribute back, please go to the donation page for more information.

No comments:

Post a Comment