Continue to where we left in previous article, in the method CompactionTask.execute() , snippet below:
AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
calling ci.iterator() return a new Reducer() where this class will perform remove this row from cache and sstable.
protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
{
protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
public void reduce(IColumnIterator current)
{
rows.add((SSTableIdentityIterator) current);
}
protected AbstractCompactedRow getReduced()
{
assert !rows.isEmpty();
try
{
AbstractCompactedRow compactedRow = controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
if (compactedRow.isEmpty())
{
controller.invalidateCachedRow(compactedRow.key);
return null;
}
else
{
// If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
// like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
// memory on long running instances
controller.removeDeletedInCache(compactedRow.key);
}
return compactedRow;
}
finally
{
rows.clear();
if ((row++ % 1000) == 0)
{
long n = 0;
for (SSTableScanner scanner : scanners)
n += scanner.getFilePointer();
bytesRead = n;
throttle.throttle(bytesRead);
}
}
}
}
The logic is similar and below is the logic to remove the expired column from the standard column family.
private static void removeDeletedStandard(ColumnFamily cf, int gcBefore)
{
Iterator<IColumn> iter = cf.iterator();
while (iter.hasNext())
{
IColumn c = iter.next();
ByteBuffer cname = c.name();
// remove columns if
// (a) the column itself is tombstoned or
// (b) the CF is tombstoned and the column is not newer than it
//
// Note that we need the inequality below for case (a) to be strict for expiring columns
// to work correctly -- see the comment in ExpiringColumn.isMarkedForDelete().
if ((c.isMarkedForDelete() && c.getLocalDeletionTime() < gcBefore)
|| c.timestamp() <= cf.getMarkedForDeleteAt())
{
iter.remove();
}
}
}
So that's pretty obvious. columns and rows get remove if the condition is satisfied.
Last but not least, if you are happy reading this and learn something, please remember to donate too.