Showing posts with label cassandra1.0.8. Show all posts
Showing posts with label cassandra1.0.8. Show all posts

Saturday, May 3, 2014

what and why always all time blocked for cassandra pool FlushWriter

FlushWriter                       0         0            941         0                53

If you noticed in a cassandra cluster, I often noticed that the pool FlushWriter all time block always increased while other pool remain 0. So is this that we should concern of?

Snippet from class ColumnFamilyStore:
/*
* maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
* we turn the writer into an SSTableReader and add it to ssTables_ where it is available for reads.
*
* There are two other things that maybeSwitchMemtable does.
* First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
* and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater
* that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes
* to happen simultaneously on multicore systems, while still calling onMF in the correct order,
* which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
* called, all data up to the given context has been persisted to SSTables.
*/
private static final ExecutorService flushWriter
= new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
new NamedThreadFactory("FlushWriter"),
"internal");

Just like other Stage.replicate_on_write, FlushWriter is also an instance of JMXEnabledThreadPoolExecutor, governed by two configuration which you can altered in cassandra.yaml.

  • memtable_flush_writers default based on number of data_file_directories specified.

  • memtable_flush_queue_size default 4


Whenever maybeSwitchMemtable is called, memtable.flushAndSignal() is called within.

Notice that in Memtable.flushAndSignal(), ExecutorService which is extends a few until the construction object JMXEnabledThreadPoolExecutor for pool FlushWriter aforementioned.  So whenever, the task is rejected due to queue full, method rejectedExecution() is triggered  which eventually increase the count by one.

So that's it, hope you get an idea what and why is the all time block for pool FlushWriter is increased, so it should give indication you should altered the parameter for the two configuration in cassandra.yaml file.

Last, if you learned something and would like to contribute back, please visit our donation page. Thank you.

Friday, May 2, 2014

How often is cassandra minor compaction running and what trigger it

There are two types of compactions in cassandra. The minor compaction and the major compaction. Today, we are going to look into minor compaction and to understand when is minor compaction kickstarted.

Following are description snippet when you create column family using cassandra-cli.
- max_compaction_threshold: The maximum number of SSTables allowed before a
minor compaction is forced. Default is 32, setting to 0 disables minor
compactions.

Decreasing this will cause minor compactions to start more frequently and
be less intensive. The min_compaction_threshold and max_compaction_threshold
boundaries are the number of tables Cassandra attempts to merge together at
once.

- min_compaction_threshold: The minimum number of SSTables needed
to start a minor compaction. Default is 4, setting to 0 disables minor
compactions.

Increasing this will cause minor compactions to start less frequently and
be more intensive. The min_compaction_threshold and max_compaction_threshold
boundaries are the number of tables Cassandra attempts to merge together at
once.

So minor compaction is trigger automatically by cassandra and major compaction is trigger manually via nodetool compact. But when and what exactly that trigger minor compaction? That's when we need to trace into the codebase.

Because compaction is performed on the column family, thus the minor compaction is trigger in the class ColumnFamilyStore. Two methods that will submit this object for compaction executor to perform the minor compaction, that is during

Depend on the compaction strategy chosen for the column family, the default SizeTieredCompactionStrategy which extends AbstractCompactionStrategy and in the super class, which started a single thread to perform this background compaction task. It seem that this optional single threaded task run every five minute.

When the mentioned two method trigger, the object ColumnFamilyStore will be submit to the background for the single thread to perform compaction.
/**
* Call this whenever a compaction might be needed on the given columnfamily.
* It's okay to over-call (within reason) since the compactions are single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing phase.
*/
public Future<Integer> submitBackground(final ColumnFamilyStore cfs)
{
Callable<Integer> callable = new Callable<Integer>()
{
public Integer call() throws IOException
{
compactionLock.readLock().lock();
try
{
if (!cfs.isValid())
return 0;

boolean taskExecuted = false;
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
for (AbstractCompactionTask task : tasks)
{
if (!task.markSSTablesForCompaction())
continue;

taskExecuted = true;
try
{
task.execute(executor);
}
finally
{
task.unmarkSSTables();
}
}
// newly created sstables might have made other compactions eligible
if (taskExecuted)
submitBackground(cfs);
}
finally
{
compactionLock.readLock().unlock();
}
return 0;
}
};
return executor.submit(callable);
}

Notice that when method getBackgroundTasks is called in submitBackground(), the min_compaction_threshold and max_compaction_threshold which you set in the column family is called here to determine if condition min_compaction_threshold is met and max_compaction_threshold.

From the experience, I don't know why datastax does not recommend major compaction via nodetool, maybe because the I/O and heap usage spike and may impair the node request and response but for me, when the node load goes beyond like 500GB, then there maybe be some stale data left in the big sstables, so it might not be a really such a bad idea to kickstart major compaction if the stale data can be removed and bring down the node load.

Last but not least, if you learn something and would like to contribute back, please go to our donation page.

Sunday, April 27, 2014

code study in cassandra compaction 108 and check what is actually gets remove

Last we covered topic such as compaction via jconsole and general study into compaction and what this article is going to focus is, when compaction happened, what happened to the data that is marked as delete, that is the tombstone?

Continue to where we left in previous article, in the method CompactionTask.execute() , snippet below:
AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());

calling ci.iterator() return a new Reducer() where this class will perform remove this row from cache and sstable.

protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
{
protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();

public void reduce(IColumnIterator current)
{
rows.add((SSTableIdentityIterator) current);
}

protected AbstractCompactedRow getReduced()
{
assert !rows.isEmpty();

try
{
AbstractCompactedRow compactedRow = controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
if (compactedRow.isEmpty())
{
controller.invalidateCachedRow(compactedRow.key);
return null;
}
else
{
// If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
// like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
// memory on long running instances
controller.removeDeletedInCache(compactedRow.key);
}

return compactedRow;
}
finally
{
rows.clear();
if ((row++ % 1000) == 0)
{
long n = 0;
for (SSTableScanner scanner : scanners)
n += scanner.getFilePointer();
bytesRead = n;
throttle.throttle(bytesRead);
}
}
}
}

The logic is similar and below is the logic to remove the expired column from the standard column family.
private static void removeDeletedStandard(ColumnFamily cf, int gcBefore)
{
Iterator<IColumn> iter = cf.iterator();
while (iter.hasNext())
{
IColumn c = iter.next();
ByteBuffer cname = c.name();
// remove columns if
// (a) the column itself is tombstoned or
// (b) the CF is tombstoned and the column is not newer than it
//
// Note that we need the inequality below for case (a) to be strict for expiring columns
// to work correctly -- see the comment in ExpiringColumn.isMarkedForDelete().
if ((c.isMarkedForDelete() && c.getLocalDeletionTime() < gcBefore)
|| c.timestamp() <= cf.getMarkedForDeleteAt())
{
iter.remove();
}
}
}

So that's pretty obvious. columns and rows get remove if the condition is satisfied.

Last but not least, if you are happy reading this and learn something, please remember to donate too.

Saturday, April 26, 2014

study gc parameters in cassandra 1.0.8

Today we are going to study the GC parameter in the file cassandra-env.sh
. Below are the GC parameter extracted from cassandra 1.0.8 environment file cassandra-env.sh . So let's study them one by one what is the parameter means and what can be change.
# GC tuning options
JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC"
JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC"
JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled"
JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8"
JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1"
JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75"
JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"

# GC logging options -- uncomment to enable
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails"
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCDateStamps"
# JVM_OPTS="$JVM_OPTS -XX:+PrintHeapAtGC"
# JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution"
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
# JVM_OPTS="$JVM_OPTS -XX:+PrintPromotionFailure"
# JVM_OPTS="$JVM_OPTS -XX:PrintFLSStatistics=1"
# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log"

-XX:+UseParNewGC

Use parallel algorithm for young space collection.

-XX:+UseConcMarkSweepGC

Use Concurrent Mark-Sweep GC in the old generation

-XX:SurvivorRatio=8

Ratio of eden/survivor space size. The default value is 8

-XX:MaxTenuringThreshold=1

Max value for tenuring threshold.

-XX:CMSInitiatingOccupancyFraction=75

Percentage CMS generation occupancy to start a CMS collection cycle (A negative value means that CMSTirggerRatio is used).

-XX:+UseCMSInitiatingOccupancyOnly

Only use occupancy as a criterion for starting a CMS collection.

 

-XX:+PrintGCDetails

Print more elaborated GC info

-XX:+PrintGCDateStamps

Print date stamps at garbage collection events (e.g. 2011-09-08T14:20:29.557+0400: [GC... )

-XX:+PrintHeapAtGCPrint

heap layout before and after each GC

-XX:+PrintTenuringDistribution

Print detailed demography of young space after each collection

-XX:+PrintGCApplicationStoppedTime

Print the time the application has been stopped

-XX:+PrintPromotionFailure

Print additional diagnostic information following promotion failure


-XX:PrintFLSStatistics=1

Print additional info concerning free lists


-Xloggc:<file>

Redirects GC output to file instead of console

The first part of GC tuning is geared toward which GC strategy to use in cassandra. The second GC tuning is more toward fine tune GC logging example timestamp, heap layaout, etc. If you want to get even more challenging, I end this article by providing a few good links for your further references.

http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html
http://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html
http://docs.oracle.com/javase/7/docs/technotes/tools/windows/java.html
http://library.blackboard.com/ref/df5b20ed-ce8d-4428-a595-a0091b23dda3/Content/_admin_server_optimize/optimize_non_standard_jvm_arguments.htm

Last but not least, if you are happy reading this and learn something, please remember to donate too.

Friday, April 25, 2014

code dive into cassandra Stage.REPLICATE_ON_WRITE

If you are administrator of a cassandra cluster, sometime you may notice StatusLogger started to flood in cassandra system.log. Example below is the log snippet found in system.log. So what and why this happened? Let us read into the codes.
 INFO [ScheduledTasks:1] 2014-04-17 14:18:00,079 StatusLogger.java (line 65) ReplicateOnWriteStage            17        17         0

StatusLogger started to write about the node thread pools into cassandra system.log under two conditions:

These indications will give an idea that the node is under stress. As you have noticed from system.log, there are many stages involved and with this article, we are going to focus on the metric Stage.REPLICATE_ON_WRITE.

What is replicate on write stage? From the code description, Replicate every counter update from the leader to the follower replicas. Accepts the values true and false. Aside from the code description, we are going to understand this stage by studying into the code.

There are 11 stages involved. When CassandraDaemon class kickstarted, StageManager is called and stages were initialized. Of cause, Stage.REPLICATE_ON_WRITE is one of the stages. An JMXConfigurableThreadPoolExecutor object with configuration 32 threads and 60 seconds keep alive is initialized. When this happened, this object is also registered to MBean server.

Apparently replicate on write stage is only trigger by column family with type counter and the code snippet below is the only code that increment replicate on write metric.
private static Runnable counterWriteTask(final IMutation mutation,
final Collection<InetAddress> targets,
final IWriteResponseHandler responseHandler,
final String localDataCenter,
final ConsistencyLevel consistency_level)
{
return new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
assert mutation instanceof CounterMutation;
final CounterMutation cm = (CounterMutation) mutation;

// apply mutation
cm.apply();
responseHandler.response(null);

// then send to replicas, if any
targets.remove(FBUtilities.getBroadcastAddress());
if (cm.shouldReplicateOnWrite() && !targets.isEmpty())
{
// We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION stage
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ)
{
public void runMayThrow() throws IOException, TimeoutException
{
// send mutation to other replica
sendToHintedEndpoints(cm.makeReplicationMutation(), targets, responseHandler, localDataCenter, consistency_level);
}
});
}
}
};
}

Whenever ThreadPoolExecutor execute the object DroppableRunner, the task will be execute by a thread in the thread pool executor.

Interface IExecutorMBean exposed three metric:

  • getActiveCount

  • getCompletedTasks

  • getPendingTasks


and interface JMXEnabledThreadPoolExecutorMBean exposed two more metrics:

  • getTotalBlockedTasks

  • getCurrentlyBlockedTasks


StatusLogger.log exposed getActiveCount, getPendingTasks and getCurrentlyBlockedTasks, hence the three columns per stage in the system.log output.

getActiveCount
get active count is actually implemented within class ThreadPoolExecutor. Whenever a worker is running a task, it is consider as an active task and this is consider as one count.

getCompletedTasks
get completed tasks were actually a wrapper to ThreadPoolExecutor.getCompletedTaskCount(). Whenever a worker is finished executed a task, this is consider one count.

getTotalBlockedTasks
when DebuggableThreadPoolExecutor object was initialized, a rejected execution handler is set. Whenever within ThreadPoolExecutor reject a command, rejectedExecution() is trigger and executed. So this translate to one reject is equivalent as one count.

That's about it for this article. When I study into this code and write this article, I get amazed on how this code is structured and it is complex. I would really recommend into study ThreadPoolExecutor.java as cassandra stage reference this code throughout.

Last but not least, if you are happy reading this and learn something, please remember to donate too.

Saturday, April 12, 2014

Investigate into cassandra 1.0.8 compaction

So what happened what you trigger compact via nodetool? In a nutshell, it goes into a series of low levels java calls.

The execution started on NodeCmd.java, NodeProbe.java, StorageServiceMBean.java, StorageService.java, ColumnFamilyStore.java, CompactionManager.java, AbstractCompactionTask.java and CompactionTask.java

Once object NodeProbe is establish, method forceTableCompaction (...) is called. Within NodeProbe, there is another called StorageServiceMBean which is the JMX bean interface implemented by class StorageService.

what forceTableCompaction(...) does is that, it iterate over the column families and start major compaction. Code snippet below:
public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
cfStore.forceMajorCompaction();
}
}

So it is pretty clear that, the execution goes by getting a valid column families and start to call its method forceMajorCompaction(). What actually happened is that, within method forceMajorCompaction(), this object (ColumnFamilyStore) is passed to CompactionManager singleton to perform an operation known as maximal.

Within CompactionManager class, the object cfStore is perform concurrently. It does by submit the cfStore object to a concurrent codes. To explain better, let's read general compaction framework below:
public Future<Object> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
// acquire the write lock long enough to schedule all sstables
compactionLock.writeLock().lock();
try
{
if (!cfStore.isValid())
return this;
AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
return this;
try
{
// downgrade the lock acquisition
compactionLock.readLock().lock();
compactionLock.writeLock().unlock();
try
{
return task.execute(executor);
}
finally
{
compactionLock.readLock().unlock();
}
}
finally
{
task.unmarkSSTables();
}
}
}
finally
{
// we probably already downgraded
if (compactionLock.writeLock().isHeldByCurrentThread())
compactionLock.writeLock().unlock();
}
return this;
}
};
return executor.submit(callable);
}

To summarize :

  • compaction write lock is made.

  • cfStore object is check again if it still valid.

  • the compaction strategy is retrieved from the cfStore object.

  • mark SSTables for compaction.

  • execute on the CompactionExecutor.


Currently there are two types of compaction strategy in this version; SizeTieredCompactionStrategy and LeveledCompactionStrategy and this discussion continue based on SizeTieredCompactionStrategy.

The real compaction work is done here.
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null;

Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isCompactionInteresting(toCompact))
return 0;

if (compactionFileLocation == null)
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
if (partialCompactionsAcceptable())
{
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
if (compactionFileLocation == null)
{
while (compactionFileLocation == null && toCompact.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
// Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all
// the sstables at the end.
toCompact.remove(cfs.getMaxSizeFile(toCompact));
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}

if (compactionFileLocation == null)
{
logger.warn("insufficient space to compact even the two smallest files, aborting");
return 0;
}
}

if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);

// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);

CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
logger.info("Compacting {}", toCompact);

long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;

long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);

AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();

Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();

if (collector != null)
collector.beginCompaction(ci);
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(toCompact);
return 0;
}

SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;

long position = writer.append(row);
totalkeysWritten++;

if (DatabaseDescriptor.getPreheatKeyCache())
{
for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
}
}
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, Long>();
}
}
}
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
}

cfs.replaceCompactedSSTables(toCompact, sstables);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
SSTableReader key = ssTableReaderMapEntry.getKey();
for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
key.cacheKey(entry.getKey(), entry.getValue());
}

long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
long endsize = SSTable.getTotalBytes(sstables);
double ratio = (double)endsize / (double)startsize;

StringBuilder builder = new StringBuilder();
builder.append("[");
for (SSTableReader reader : sstables)
builder.append(reader.getFilename()).append(",");
builder.append("]");

double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.",
builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
return toCompact.size();
}

That's a lot of works done in this method. :-) I summarized some important points below:

  • checking if enough sstables are present to compact.

  • check if the disk size is suffcient for this compaction task.

  • snapshot before compaction happen.

  • check sstable to be compact is belong to the same column family.

  • CompactionExecutorStatsCollector begin compaction with the AbstractCompactionIterable.

  •  create a compaction writer.

  •  replace a new compacted sstable with the old sstables.


I hope you enjoy this writing.

Thursday, January 16, 2014

force sstable compaction through jconsole

Happy new year everyone, this is my first article for 2014 and as a start, it is going to be short and sweet one. :-)

I was working on this project where I have deleted the column and I remember the definition of tombstone is that, you need to run compact so that the tombstone is removed. When I run nodetool compact, this message appear below.
INFO 19:15:09,170 Nothing to compact in index1.  Use forceUserDefinedCompaction if you wish to force compaction of single sstables (e.g. for tombstone collection)

So what is this means is that, you need to have jconsole running because forceUserDefinedCompaction can only invoke through jconsole. When jconsole is connected to your cassandra daemon process, you need to navigate to the compactionmanager mbean.

Then you need to provide two value to this method as this method expected two parameters. That is the keyspace and the sstables where you wish to compact. You can see in this attachment on how I did it.



So if you ls the data directory where the sstable are store, a new sstable should be generated. Once the operation is complete, you can navigate to the data directory where the sstable is stored.

$ ls data/lucene1/
index1-hc-3-Data.db index1-hc-3-Digest.sha1 index1-hc-3-Filter.db index1-hc-3-Index.db index1-hc-3-Statistics.db snapshots

We have production machine which a single sstable size is more than 25GB. When the soft limit compaction is met (default min 4 and max 32) , in this situation, I think it will load the server if it takes place, probably best is, we forceUserDefinedCompaction on this single large sstable.