Showing posts with label compaction. Show all posts
Showing posts with label compaction. Show all posts

Sunday, July 3, 2016

apache cassandra 1.0.8 on READ_STAGE threads reference on sstables and so compaction cannot remove the sstables.

Back then when I was administer a apache cassandra 1.0.8 cluster, I noticed there were some (very little) sstables did not get remove even after compaction is done. The leftover sstables cause some administrative problem and I suspect could be due to maybe during reading of the sstables, this maybe not get remove.

 DataTracker.java  
   
   private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
   {  
     View currentView, newView;  
     do  
     {  
       currentView = view.get();  
       newView = currentView.replace(oldSSTables, replacements);  
     }  
     while (!view.compareAndSet(currentView, newView));  
   
     addNewSSTablesSize(replacements);  
     removeOldSSTablesSize(oldSSTables);  
   
     cfstore.updateCacheSizes();  
   }  

I supposed during replacement of the view and sstables, everything is atomic and hence during read, it will get from the new sstables. But I don't have enough high level knowledge on various subsystems work in cassandra. If you have an idea, please do leave your comment below.

This problem seem to go away after we upgraded the cluster to 1.1. I know by now (april 2016), cassandra 1.0, 1.1 or even 1.2 is ancient but if you are on 1.0 and pre1.0, you should really start to use cassandra 3.x or at least 2.x.

Sunday, December 20, 2015

what happened to the old sstables after apache cassandra compaction is done

Last we study into apache cassandra 1.0.8 compaction and now in this article , we will focus on what will happen after the sstables were compacted. Reading on the class CompactionTask method execute(...) with snippet on the compaction sstables.

1:      ...  
2:      ...  
3:      cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);  
4:      // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up  
5:      for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())  
6:      {  
7:        SSTableReader key = ssTableReaderMapEntry.getKey();  
8:        for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())  
9:          key.cacheKey(entry.getKey(), entry.getValue());  
10:      }  

After the sstables compaction process is done, we see that the new sstable is persist and the old sstables are replaces. After that, the key cache is also updated. Onto the sstables replacements is where we interested in this article. Tracing down execution calls made.

ColumnFamilyStore.java

1:    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)  
2:    {  
3:      data.replaceCompactedSSTables(sstables, replacements, compactionType);  
4:    }  

DataTracker.java 

1:    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)  
2:    {  
3:      replace(sstables, replacements);  
4:      notifySSTablesChanged(sstables, replacements, compactionType);  
5:    }  

DataTracker.java

1:    private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:    {  
3:      View currentView, newView;  
4:      do  
5:      {  
6:        currentView = view.get();  
7:        newView = currentView.replace(oldSSTables, replacements);  
8:      }  
9:      while (!view.compareAndSet(currentView, newView));  
10:    
11:      addNewSSTablesSize(replacements);  
12:      removeOldSSTablesSize(oldSSTables);  
13:    
14:      cfstore.updateCacheSizes();  
15:    }  

DataTracker.java

1:    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)  
2:    {  
3:      for (INotificationConsumer subscriber : subscribers)  
4:      {  
5:        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);  
6:        subscriber.handleNotification(notification, this);  
7:      }  
8:    }  

At this point, replace compacted sstables consists of actual replacements and notify on sstables changed. But first we will take a look at replacement process.

DataTracker.View.java

1:      public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:      {  
3:        List<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);  
4:        IntervalTree intervalTree = buildIntervalTree(newSSTables);  
5:        return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting, intervalTree);  
6:      }  

DataTracker.View.java

1:      private List<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
2:      {  
3:        ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);  
4:        int newSSTablesSize = sstables.size() - oldSSTables.size() + Iterables.size(replacements);  
5:        assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);  
6:        List<SSTableReader> newSSTables = new ArrayList<SSTableReader>(newSSTablesSize);  
7:        for (SSTableReader sstable : sstables)  
8:        {  
9:          if (!oldSet.contains(sstable))  
10:            newSSTables.add(sstable);  
11:        }  
12:        Iterables.addAll(newSSTables, replacements);  
13:        assert newSSTables.size() == newSSTablesSize : String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", newSSTablesSize, newSSTables.size(), oldSSTables, replacements, this);  
14:        return newSSTables;  
15:      }  

DataTracker.View.java

1:      private IntervalTree buildIntervalTree(List<SSTableReader> sstables)  
2:      {  
3:        List<Interval> intervals = new ArrayList<Interval>(sstables.size());  
4:        for (SSTableReader sstable : sstables)  
5:          intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));  
6:        return new IntervalTree<SSTableReader>(intervals);  
7:      }  

DataTracker.java

1:    private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)  
2:    {  
3:      for (SSTableReader sstable : newSSTables)  
4:      {  
5:        assert sstable.getKeySamples() != null;  
6:        if (logger.isDebugEnabled())  
7:          logger.debug(String.format("adding %s to list of files tracked for %s.%s",  
8:                sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));  
9:        long size = sstable.bytesOnDisk();  
10:        liveSize.addAndGet(size);  
11:        totalSize.addAndGet(size);  
12:        sstable.setTrackedBy(this);  
13:      }  
14:    }  

DataTracker.java  

1:    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)  
2:    {  
3:      for (SSTableReader sstable : oldSSTables)  
4:      {  
5:        if (logger.isDebugEnabled())  
6:          logger.debug(String.format("removing %s from list of files tracked for %s.%s",  
7:                sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));  
8:        liveSize.addAndGet(-sstable.bytesOnDisk());  
9:        sstable.markCompacted();  
10:        sstable.releaseReference();  
11:      }  
12:    }    

SSTableReader.java

1:    /**  
2:     * Mark the sstable as compacted.  
3:     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere  
4:     * except for threads holding a reference.  
5:     */  
6:    public void markCompacted()  
7:    {  
8:      if (logger.isDebugEnabled())  
9:        logger.debug("Marking " + getFilename() + " compacted");  
10:      try  
11:      {  
12:        if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile())  
13:          throw new IOException("Unable to create compaction marker");  
14:      }  
15:      catch (IOException e)  
16:      {  
17:        throw new IOError(e);  
18:      }  
19:    
20:      boolean alreadyCompacted = isCompacted.getAndSet(true);  
21:      assert !alreadyCompacted : this + " was already marked compacted";  
22:    }  

SSTableReader.java

1:    public void releaseReference()  
2:    {  
3:      if (references.decrementAndGet() == 0 && isCompacted.get())  
4:      {  
5:        // Force finalizing mmapping if necessary  
6:        ifile.cleanup();  
7:        dfile.cleanup();  
8:    
9:        deletingTask.schedule();  
10:      }  
11:      assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path;  
12:    }   

SSTableDeletingTask.java

1:    public void schedule()  
2:    {  
3:      StorageService.tasks.submit(this);  
4:    }  

ColumnFamilyStore.java  

1:    /**  
2:     * Resizes the key and row caches based on the current key estimate.  
3:     */  
4:    public synchronized void updateCacheSizes()  
5:    {  
6:      long keys = estimateKeys();  
7:      keyCache.updateCacheSize(keys);  
8:      rowCache.updateCacheSize(keys);  
9:    }  

As shown above, there are many things even in the replacement process! We can summarized based on the code trace above,

  • an interval tree is built using the replacement sstable. After that, that new view is returned.
  • the process above is repeated until the view become equal.
  • addNewSSTablesSize make the replacement sstable become active.
  • finally it is time to remove the old sstables.
  • the old sstables will be marks as compacted and then remove when it is no longer reference by threads.


Onto the method notifySSTablesChanged(),

DataTracker.java

1:    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)  
2:    {  
3:      for (INotificationConsumer subscriber : subscribers)  
4:      {  
5:        INotification notification = new SSTableListChangedNotification(added, removed, compactionType);  
6:        subscriber.handleNotification(notification, this);  
7:      }  
8:    }  

For each of the subscribers, the sstable list change is notified and class that implement the interface should handle the changed.

Friday, January 16, 2015

operate casandra using jmx in terminal including changing pool size, compacting sstables and key cache

If you operate apache cassandra cluster and if load per node goes huge (like nodetool info show 800GB), compactions become a problem. It's a big problem for apache cassandra 1.0.8 if you have load per node average hover around 600GB to 1TB. The read performance suffers and at times system uptime load goes high. In some instance, I noticed when repair is running, system load goes more than 20. It's not a concern if this is operating well, but the more often you see this, something has gone wrong. Today, I will share my experience on how to operate cassandra when node load is huge and cassandra instance is still running. Often times, there are nice method that is exposed via jmx but to operate remotely, jmx gui client such as jmxconsole is not ideal. Instead, we will using a jmxterm for these operation in apache cassandra 1.0.8. So let's get started.

Changing pool size

So, it is pretty simple, launch it and set to the bean, and then set the CorePoolSize. The steps will be illustrate below.
$ java -jar jmxterm-1.0-alpha-4-uber.jar
$>open localhost:7199
#Connection to localhost:7199 is opened
$>bean org.apache.cassandra.request:type=ReplicateOnWriteStage
#bean is set to org.apache.cassandra.request:type=ReplicateOnWriteStage
$>get CorePoolSize
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage:
CorePoolSize = 32;
$>info
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage
#class name = org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor
# attributes
%0 - ActiveCount (int, r)
%1 - CompletedTasks (long, r)
%2 - CorePoolSize (int, rw)
%3 - CurrentlyBlockedTasks (int, r)
%4 - PendingTasks (long, r)
%5 - TotalBlockedTasks (int, r)
#there's no operations
#there's no notifications
$>set CorePoolSize 64
$>get CorePoolSize
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage:
CorePoolSize = 64;

Alter key cache

Often times, when there is heap pressure in the jvm, the safety valve kicks in.  You can restart the cassandra instance or you can reset the key cache back to the initial value. Assuming your column family name FooBar and keyspace just4fun, then the following are steps to illustrate how is this done.
$>bean org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
#bean is set to org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
$>info
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
#class name = org.apache.cassandra.cache.AutoSavingKeyCache
# attributes
%0 - Capacity (int, rw)
%1 - Hits (long, r)
%2 - RecentHitRate (double, r)
%3 - Requests (long, r)
%4 - Size (int, r)
#there's no operations
#there's no notifications
$>
$>get Size
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
Size = 122307;

$>set Capacity 250000
#Value of attribute Capacity is set to 250000
$>get Capacity;
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
$>get Capacity
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
Capacity = 250000;

Compact sstable

Lastly, to compact sstables. It's amazing we have a sstable that as huge as 84GB! So trigger major compaction is not an option here, often time when load per node goes beyond 600GB, compaction took forever, as GC kick in and cpu keep on recollecting heap, making system load goes high. So here, we will select one sstable that is huge and compact that only. You can also select a few sstable and compact them and separate using comma.
$>bean org.apache.cassandra.db:type=CompactionManager
#bean is set to org.apache.cassandra.db:type=CompactionManager
$>run forceUserDefinedCompaction just4fun FooBar-hc-5-Index.db
#calling operation forceUserDefinedCompaction of mbean org.apache.cassandra.db:type=CompactionManager
#RuntimeMBeanException: java.lang.IllegalArgumentException: FooBar-hc-5-Index.db does not appear to be a data file
$>run forceUserDefinedCompaction just4fun FooBar-hc-401-Data.db
#calling operation forceUserDefinedCompaction of mbean org.apache.cassandra.db:type=CompactionManager
#operation returns:
null

The compaction should be started, you can check in cassandra system log or the nodetool compaction. So that's it, I hope you learned something.

Friday, May 2, 2014

How often is cassandra minor compaction running and what trigger it

There are two types of compactions in cassandra. The minor compaction and the major compaction. Today, we are going to look into minor compaction and to understand when is minor compaction kickstarted.

Following are description snippet when you create column family using cassandra-cli.
- max_compaction_threshold: The maximum number of SSTables allowed before a
minor compaction is forced. Default is 32, setting to 0 disables minor
compactions.

Decreasing this will cause minor compactions to start more frequently and
be less intensive. The min_compaction_threshold and max_compaction_threshold
boundaries are the number of tables Cassandra attempts to merge together at
once.

- min_compaction_threshold: The minimum number of SSTables needed
to start a minor compaction. Default is 4, setting to 0 disables minor
compactions.

Increasing this will cause minor compactions to start less frequently and
be more intensive. The min_compaction_threshold and max_compaction_threshold
boundaries are the number of tables Cassandra attempts to merge together at
once.

So minor compaction is trigger automatically by cassandra and major compaction is trigger manually via nodetool compact. But when and what exactly that trigger minor compaction? That's when we need to trace into the codebase.

Because compaction is performed on the column family, thus the minor compaction is trigger in the class ColumnFamilyStore. Two methods that will submit this object for compaction executor to perform the minor compaction, that is during

Depend on the compaction strategy chosen for the column family, the default SizeTieredCompactionStrategy which extends AbstractCompactionStrategy and in the super class, which started a single thread to perform this background compaction task. It seem that this optional single threaded task run every five minute.

When the mentioned two method trigger, the object ColumnFamilyStore will be submit to the background for the single thread to perform compaction.
/**
* Call this whenever a compaction might be needed on the given columnfamily.
* It's okay to over-call (within reason) since the compactions are single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing phase.
*/
public Future<Integer> submitBackground(final ColumnFamilyStore cfs)
{
Callable<Integer> callable = new Callable<Integer>()
{
public Integer call() throws IOException
{
compactionLock.readLock().lock();
try
{
if (!cfs.isValid())
return 0;

boolean taskExecuted = false;
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
for (AbstractCompactionTask task : tasks)
{
if (!task.markSSTablesForCompaction())
continue;

taskExecuted = true;
try
{
task.execute(executor);
}
finally
{
task.unmarkSSTables();
}
}
// newly created sstables might have made other compactions eligible
if (taskExecuted)
submitBackground(cfs);
}
finally
{
compactionLock.readLock().unlock();
}
return 0;
}
};
return executor.submit(callable);
}

Notice that when method getBackgroundTasks is called in submitBackground(), the min_compaction_threshold and max_compaction_threshold which you set in the column family is called here to determine if condition min_compaction_threshold is met and max_compaction_threshold.

From the experience, I don't know why datastax does not recommend major compaction via nodetool, maybe because the I/O and heap usage spike and may impair the node request and response but for me, when the node load goes beyond like 500GB, then there maybe be some stale data left in the big sstables, so it might not be a really such a bad idea to kickstart major compaction if the stale data can be removed and bring down the node load.

Last but not least, if you learn something and would like to contribute back, please go to our donation page.

Sunday, April 27, 2014

code study in cassandra compaction 108 and check what is actually gets remove

Last we covered topic such as compaction via jconsole and general study into compaction and what this article is going to focus is, when compaction happened, what happened to the data that is marked as delete, that is the tombstone?

Continue to where we left in previous article, in the method CompactionTask.execute() , snippet below:
AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());

calling ci.iterator() return a new Reducer() where this class will perform remove this row from cache and sstable.

protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
{
protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();

public void reduce(IColumnIterator current)
{
rows.add((SSTableIdentityIterator) current);
}

protected AbstractCompactedRow getReduced()
{
assert !rows.isEmpty();

try
{
AbstractCompactedRow compactedRow = controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
if (compactedRow.isEmpty())
{
controller.invalidateCachedRow(compactedRow.key);
return null;
}
else
{
// If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
// like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
// memory on long running instances
controller.removeDeletedInCache(compactedRow.key);
}

return compactedRow;
}
finally
{
rows.clear();
if ((row++ % 1000) == 0)
{
long n = 0;
for (SSTableScanner scanner : scanners)
n += scanner.getFilePointer();
bytesRead = n;
throttle.throttle(bytesRead);
}
}
}
}

The logic is similar and below is the logic to remove the expired column from the standard column family.
private static void removeDeletedStandard(ColumnFamily cf, int gcBefore)
{
Iterator<IColumn> iter = cf.iterator();
while (iter.hasNext())
{
IColumn c = iter.next();
ByteBuffer cname = c.name();
// remove columns if
// (a) the column itself is tombstoned or
// (b) the CF is tombstoned and the column is not newer than it
//
// Note that we need the inequality below for case (a) to be strict for expiring columns
// to work correctly -- see the comment in ExpiringColumn.isMarkedForDelete().
if ((c.isMarkedForDelete() && c.getLocalDeletionTime() < gcBefore)
|| c.timestamp() <= cf.getMarkedForDeleteAt())
{
iter.remove();
}
}
}

So that's pretty obvious. columns and rows get remove if the condition is satisfied.

Last but not least, if you are happy reading this and learn something, please remember to donate too.

Saturday, April 12, 2014

Investigate into cassandra 1.0.8 compaction

So what happened what you trigger compact via nodetool? In a nutshell, it goes into a series of low levels java calls.

The execution started on NodeCmd.java, NodeProbe.java, StorageServiceMBean.java, StorageService.java, ColumnFamilyStore.java, CompactionManager.java, AbstractCompactionTask.java and CompactionTask.java

Once object NodeProbe is establish, method forceTableCompaction (...) is called. Within NodeProbe, there is another called StorageServiceMBean which is the JMX bean interface implemented by class StorageService.

what forceTableCompaction(...) does is that, it iterate over the column families and start major compaction. Code snippet below:
public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
cfStore.forceMajorCompaction();
}
}

So it is pretty clear that, the execution goes by getting a valid column families and start to call its method forceMajorCompaction(). What actually happened is that, within method forceMajorCompaction(), this object (ColumnFamilyStore) is passed to CompactionManager singleton to perform an operation known as maximal.

Within CompactionManager class, the object cfStore is perform concurrently. It does by submit the cfStore object to a concurrent codes. To explain better, let's read general compaction framework below:
public Future<Object> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
// acquire the write lock long enough to schedule all sstables
compactionLock.writeLock().lock();
try
{
if (!cfStore.isValid())
return this;
AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
return this;
try
{
// downgrade the lock acquisition
compactionLock.readLock().lock();
compactionLock.writeLock().unlock();
try
{
return task.execute(executor);
}
finally
{
compactionLock.readLock().unlock();
}
}
finally
{
task.unmarkSSTables();
}
}
}
finally
{
// we probably already downgraded
if (compactionLock.writeLock().isHeldByCurrentThread())
compactionLock.writeLock().unlock();
}
return this;
}
};
return executor.submit(callable);
}

To summarize :

  • compaction write lock is made.

  • cfStore object is check again if it still valid.

  • the compaction strategy is retrieved from the cfStore object.

  • mark SSTables for compaction.

  • execute on the CompactionExecutor.


Currently there are two types of compaction strategy in this version; SizeTieredCompactionStrategy and LeveledCompactionStrategy and this discussion continue based on SizeTieredCompactionStrategy.

The real compaction work is done here.
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null;

Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isCompactionInteresting(toCompact))
return 0;

if (compactionFileLocation == null)
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
if (partialCompactionsAcceptable())
{
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
if (compactionFileLocation == null)
{
while (compactionFileLocation == null && toCompact.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
// Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all
// the sstables at the end.
toCompact.remove(cfs.getMaxSizeFile(toCompact));
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}

if (compactionFileLocation == null)
{
logger.warn("insufficient space to compact even the two smallest files, aborting");
return 0;
}
}

if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);

// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);

CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
logger.info("Compacting {}", toCompact);

long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;

long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);

AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();

Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();

if (collector != null)
collector.beginCompaction(ci);
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(toCompact);
return 0;
}

SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;

long position = writer.append(row);
totalkeysWritten++;

if (DatabaseDescriptor.getPreheatKeyCache())
{
for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
}
}
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, Long>();
}
}
}
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
}

cfs.replaceCompactedSSTables(toCompact, sstables);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
SSTableReader key = ssTableReaderMapEntry.getKey();
for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
key.cacheKey(entry.getKey(), entry.getValue());
}

long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
long endsize = SSTable.getTotalBytes(sstables);
double ratio = (double)endsize / (double)startsize;

StringBuilder builder = new StringBuilder();
builder.append("[");
for (SSTableReader reader : sstables)
builder.append(reader.getFilename()).append(",");
builder.append("]");

double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.",
builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
return toCompact.size();
}

That's a lot of works done in this method. :-) I summarized some important points below:

  • checking if enough sstables are present to compact.

  • check if the disk size is suffcient for this compaction task.

  • snapshot before compaction happen.

  • check sstable to be compact is belong to the same column family.

  • CompactionExecutorStatsCollector begin compaction with the AbstractCompactionIterable.

  •  create a compaction writer.

  •  replace a new compacted sstable with the old sstables.


I hope you enjoy this writing.

Thursday, January 16, 2014

force sstable compaction through jconsole

Happy new year everyone, this is my first article for 2014 and as a start, it is going to be short and sweet one. :-)

I was working on this project where I have deleted the column and I remember the definition of tombstone is that, you need to run compact so that the tombstone is removed. When I run nodetool compact, this message appear below.
INFO 19:15:09,170 Nothing to compact in index1.  Use forceUserDefinedCompaction if you wish to force compaction of single sstables (e.g. for tombstone collection)

So what is this means is that, you need to have jconsole running because forceUserDefinedCompaction can only invoke through jconsole. When jconsole is connected to your cassandra daemon process, you need to navigate to the compactionmanager mbean.

Then you need to provide two value to this method as this method expected two parameters. That is the keyspace and the sstables where you wish to compact. You can see in this attachment on how I did it.



So if you ls the data directory where the sstable are store, a new sstable should be generated. Once the operation is complete, you can navigate to the data directory where the sstable is stored.

$ ls data/lucene1/
index1-hc-3-Data.db index1-hc-3-Digest.sha1 index1-hc-3-Filter.db index1-hc-3-Index.db index1-hc-3-Statistics.db snapshots

We have production machine which a single sstable size is more than 25GB. When the soft limit compaction is met (default min 4 and max 32) , in this situation, I think it will load the server if it takes place, probably best is, we forceUserDefinedCompaction on this single large sstable.

Tuesday, December 31, 2013

Cassandra cluster jmx metrics inspection to decide if cluster expansion is justifiable

One of the important decision during managing the clusters is to determine when cluster capacity should expand. To maintain a cluster in optimal performance will give the applications working nicely and most importantly, it give confidence to the people.

So, to answer question like, how do I determine if my cluster is at bottleneck? To answer this type of question, you will need to have the measuring tools ready and measure over time. Meaning that you need to display statistics in graphical form and with the history, it should give an indication of the cluster performance.

Because the topic will grow huge, hence, we will focus on a specific metric. This article gonna inspect the metric exposed by the jmx beans. In order to inspect the jmx metrics, you will need a jmx client. There is a gui jmx client that comes with the jdk, that is jconsole. Because nature of this article, I would suggest you go for cli jmx client, for example jmxterm. You can read introduction of jmxterm here.

There are many metrics exposed by cassandra jmx beans. But we will focus on bean org.apache.cassandra.db:type=CompactionManager.

If you are using jmxterm, you can read the output below:
$ cat test.script 
open localhost:7199
bean org.apache.cassandra.db:type=CompactionManager
get PendingTasks
$ java -jar jmxterm-1.0-alpha-4-uber.jar -i test.script
Welcome to JMX terminal. Type "help" for available commands.
#Connection to localhost:7199 is opened
#bean is set to org.apache.cassandra.db:type=CompactionManager
#mbean = org.apache.cassandra.db:type=CompactionManager:
PendingTasks = 0;

So if you plot PendingTasks in a graph over time with a periodic interval, it should give insight to your cluster performance. You can also plot the statistics output from nodetool tpstats. I would suggest also, you plot Message type dropped as those metrics indicate over time that the performance is impacted. If you have a stock cassandra settings, you will probably want to fine tune to your node at this point after this investigation and analysis on the graph.

There is no best strategies, as mentioned earlier, you need experience and there are many other metrics measuring tools, example sar , iostats, top, your application measurement. So it take some times to even master all of these but it is crucial if you managed a production cluster.