1: ...
2: ...
3: cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
4: // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
5: for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
6: {
7: SSTableReader key = ssTableReaderMapEntry.getKey();
8: for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
9: key.cacheKey(entry.getKey(), entry.getValue());
10: }
After the sstables compaction process is done, we see that the new sstable is persist and the old sstables are replaces. After that, the key cache is also updated. Onto the sstables replacements is where we interested in this article. Tracing down execution calls made.
ColumnFamilyStore.java
1: public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)
2: {
3: data.replaceCompactedSSTables(sstables, replacements, compactionType);
4: }
DataTracker.java
1: public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)
2: {
3: replace(sstables, replacements);
4: notifySSTablesChanged(sstables, replacements, compactionType);
5: }
DataTracker.java
1: private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
2: {
3: View currentView, newView;
4: do
5: {
6: currentView = view.get();
7: newView = currentView.replace(oldSSTables, replacements);
8: }
9: while (!view.compareAndSet(currentView, newView));
10:
11: addNewSSTablesSize(replacements);
12: removeOldSSTablesSize(oldSSTables);
13:
14: cfstore.updateCacheSizes();
15: }
DataTracker.java
1: public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)
2: {
3: for (INotificationConsumer subscriber : subscribers)
4: {
5: INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
6: subscriber.handleNotification(notification, this);
7: }
8: }
At this point, replace compacted sstables consists of actual replacements and notify on sstables changed. But first we will take a look at replacement process.
DataTracker.View.java
1: public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
2: {
3: List<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);
4: IntervalTree intervalTree = buildIntervalTree(newSSTables);
5: return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting, intervalTree);
6: }
DataTracker.View.java
1: private List<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
2: {
3: ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);
4: int newSSTablesSize = sstables.size() - oldSSTables.size() + Iterables.size(replacements);
5: assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);
6: List<SSTableReader> newSSTables = new ArrayList<SSTableReader>(newSSTablesSize);
7: for (SSTableReader sstable : sstables)
8: {
9: if (!oldSet.contains(sstable))
10: newSSTables.add(sstable);
11: }
12: Iterables.addAll(newSSTables, replacements);
13: assert newSSTables.size() == newSSTablesSize : String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", newSSTablesSize, newSSTables.size(), oldSSTables, replacements, this);
14: return newSSTables;
15: }
DataTracker.View.java
1: private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
2: {
3: List<Interval> intervals = new ArrayList<Interval>(sstables.size());
4: for (SSTableReader sstable : sstables)
5: intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
6: return new IntervalTree<SSTableReader>(intervals);
7: }
DataTracker.java
1: private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
2: {
3: for (SSTableReader sstable : newSSTables)
4: {
5: assert sstable.getKeySamples() != null;
6: if (logger.isDebugEnabled())
7: logger.debug(String.format("adding %s to list of files tracked for %s.%s",
8: sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
9: long size = sstable.bytesOnDisk();
10: liveSize.addAndGet(size);
11: totalSize.addAndGet(size);
12: sstable.setTrackedBy(this);
13: }
14: }
DataTracker.java
1: private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)
2: {
3: for (SSTableReader sstable : oldSSTables)
4: {
5: if (logger.isDebugEnabled())
6: logger.debug(String.format("removing %s from list of files tracked for %s.%s",
7: sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
8: liveSize.addAndGet(-sstable.bytesOnDisk());
9: sstable.markCompacted();
10: sstable.releaseReference();
11: }
12: }
SSTableReader.java
1: /**
2: * Mark the sstable as compacted.
3: * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
4: * except for threads holding a reference.
5: */
6: public void markCompacted()
7: {
8: if (logger.isDebugEnabled())
9: logger.debug("Marking " + getFilename() + " compacted");
10: try
11: {
12: if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile())
13: throw new IOException("Unable to create compaction marker");
14: }
15: catch (IOException e)
16: {
17: throw new IOError(e);
18: }
19:
20: boolean alreadyCompacted = isCompacted.getAndSet(true);
21: assert !alreadyCompacted : this + " was already marked compacted";
22: }
SSTableReader.java
1: public void releaseReference()
2: {
3: if (references.decrementAndGet() == 0 && isCompacted.get())
4: {
5: // Force finalizing mmapping if necessary
6: ifile.cleanup();
7: dfile.cleanup();
8:
9: deletingTask.schedule();
10: }
11: assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;
12: }
SSTableDeletingTask.java
1: public void schedule()
2: {
3: StorageService.tasks.submit(this);
4: }
ColumnFamilyStore.java
1: /**
2: * Resizes the key and row caches based on the current key estimate.
3: */
4: public synchronized void updateCacheSizes()
5: {
6: long keys = estimateKeys();
7: keyCache.updateCacheSize(keys);
8: rowCache.updateCacheSize(keys);
9: }
As shown above, there are many things even in the replacement process! We can summarized based on the code trace above,
- an interval tree is built using the replacement sstable. After that, that new view is returned.
- the process above is repeated until the view become equal.
- addNewSSTablesSize make the replacement sstable become active.
- finally it is time to remove the old sstables.
- the old sstables will be marks as compacted and then remove when it is no longer reference by threads.
Onto the method notifySSTablesChanged(),
DataTracker.java
1: public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)
2: {
3: for (INotificationConsumer subscriber : subscribers)
4: {
5: INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
6: subscriber.handleNotification(notification, this);
7: }
8: }
For each of the subscribers, the sstable list change is notified and class that implement the interface should handle the changed.