Saturday, April 12, 2014

Investigate into cassandra 1.0.8 compaction

So what happened what you trigger compact via nodetool? In a nutshell, it goes into a series of low levels java calls.

The execution started on NodeCmd.java, NodeProbe.java, StorageServiceMBean.java, StorageService.java, ColumnFamilyStore.java, CompactionManager.java, AbstractCompactionTask.java and CompactionTask.java

Once object NodeProbe is establish, method forceTableCompaction (...) is called. Within NodeProbe, there is another called StorageServiceMBean which is the JMX bean interface implemented by class StorageService.

what forceTableCompaction(...) does is that, it iterate over the column families and start major compaction. Code snippet below:
public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
cfStore.forceMajorCompaction();
}
}

So it is pretty clear that, the execution goes by getting a valid column families and start to call its method forceMajorCompaction(). What actually happened is that, within method forceMajorCompaction(), this object (ColumnFamilyStore) is passed to CompactionManager singleton to perform an operation known as maximal.

Within CompactionManager class, the object cfStore is perform concurrently. It does by submit the cfStore object to a concurrent codes. To explain better, let's read general compaction framework below:
public Future<Object> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
// acquire the write lock long enough to schedule all sstables
compactionLock.writeLock().lock();
try
{
if (!cfStore.isValid())
return this;
AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
return this;
try
{
// downgrade the lock acquisition
compactionLock.readLock().lock();
compactionLock.writeLock().unlock();
try
{
return task.execute(executor);
}
finally
{
compactionLock.readLock().unlock();
}
}
finally
{
task.unmarkSSTables();
}
}
}
finally
{
// we probably already downgraded
if (compactionLock.writeLock().isHeldByCurrentThread())
compactionLock.writeLock().unlock();
}
return this;
}
};
return executor.submit(callable);
}

To summarize :

  • compaction write lock is made.

  • cfStore object is check again if it still valid.

  • the compaction strategy is retrieved from the cfStore object.

  • mark SSTables for compaction.

  • execute on the CompactionExecutor.


Currently there are two types of compaction strategy in this version; SizeTieredCompactionStrategy and LeveledCompactionStrategy and this discussion continue based on SizeTieredCompactionStrategy.

The real compaction work is done here.
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null;

Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isCompactionInteresting(toCompact))
return 0;

if (compactionFileLocation == null)
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
if (partialCompactionsAcceptable())
{
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
if (compactionFileLocation == null)
{
while (compactionFileLocation == null && toCompact.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
// Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all
// the sstables at the end.
toCompact.remove(cfs.getMaxSizeFile(toCompact));
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}

if (compactionFileLocation == null)
{
logger.warn("insufficient space to compact even the two smallest files, aborting");
return 0;
}
}

if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);

// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);

CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
logger.info("Compacting {}", toCompact);

long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;

long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);

AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();

Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();

if (collector != null)
collector.beginCompaction(ci);
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(toCompact);
return 0;
}

SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;

long position = writer.append(row);
totalkeysWritten++;

if (DatabaseDescriptor.getPreheatKeyCache())
{
for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
}
}
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, Long>();
}
}
}
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
}

cfs.replaceCompactedSSTables(toCompact, sstables);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
SSTableReader key = ssTableReaderMapEntry.getKey();
for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
key.cacheKey(entry.getKey(), entry.getValue());
}

long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
long endsize = SSTable.getTotalBytes(sstables);
double ratio = (double)endsize / (double)startsize;

StringBuilder builder = new StringBuilder();
builder.append("[");
for (SSTableReader reader : sstables)
builder.append(reader.getFilename()).append(",");
builder.append("]");

double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.",
builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
return toCompact.size();
}

That's a lot of works done in this method. :-) I summarized some important points below:

  • checking if enough sstables are present to compact.

  • check if the disk size is suffcient for this compaction task.

  • snapshot before compaction happen.

  • check sstable to be compact is belong to the same column family.

  • CompactionExecutorStatsCollector begin compaction with the AbstractCompactionIterable.

  •  create a compaction writer.

  •  replace a new compacted sstable with the old sstables.


I hope you enjoy this writing.

1 comment:

  1. [...] we covered topic such as compaction via jconsole and general study into compaction and what this article is going to focus is, when compaction happened, what happened to the data [...]

    ReplyDelete