INFO [ScheduledTasks:1] 2014-04-17 14:18:00,079 StatusLogger.java (line 65) ReplicateOnWriteStage 17 17 0
StatusLogger started to write about the node thread pools into cassandra system.log under two conditions:
- When there are message dropped in the node.
- When duration per gc collection greater than a second.
These indications will give an idea that the node is under stress. As you have noticed from system.log, there are many stages involved and with this article, we are going to focus on the metric Stage.REPLICATE_ON_WRITE.
What is replicate on write stage? From the code description, Replicate every counter update from the leader to the follower replicas. Accepts the values true and false. Aside from the code description, we are going to understand this stage by studying into the code.
There are 11 stages involved. When CassandraDaemon class kickstarted, StageManager is called and stages were initialized. Of cause, Stage.REPLICATE_ON_WRITE is one of the stages. An JMXConfigurableThreadPoolExecutor object with configuration 32 threads and 60 seconds keep alive is initialized. When this happened, this object is also registered to MBean server.
Apparently replicate on write stage is only trigger by column family with type counter and the code snippet below is the only code that increment replicate on write metric.
private static Runnable counterWriteTask(final IMutation mutation,
final Collection<InetAddress> targets,
final IWriteResponseHandler responseHandler,
final String localDataCenter,
final ConsistencyLevel consistency_level)
{
return new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
assert mutation instanceof CounterMutation;
final CounterMutation cm = (CounterMutation) mutation;
// apply mutation
cm.apply();
responseHandler.response(null);
// then send to replicas, if any
targets.remove(FBUtilities.getBroadcastAddress());
if (cm.shouldReplicateOnWrite() && !targets.isEmpty())
{
// We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION stage
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ)
{
public void runMayThrow() throws IOException, TimeoutException
{
// send mutation to other replica
sendToHintedEndpoints(cm.makeReplicationMutation(), targets, responseHandler, localDataCenter, consistency_level);
}
});
}
}
};
}
Whenever ThreadPoolExecutor execute the object DroppableRunner, the task will be execute by a thread in the thread pool executor.
Interface IExecutorMBean exposed three metric:
- getActiveCount
- getCompletedTasks
- getPendingTasks
and interface JMXEnabledThreadPoolExecutorMBean exposed two more metrics:
- getTotalBlockedTasks
- getCurrentlyBlockedTasks
StatusLogger.log exposed getActiveCount, getPendingTasks and getCurrentlyBlockedTasks, hence the three columns per stage in the system.log output.
getActiveCount
get active count is actually implemented within class ThreadPoolExecutor. Whenever a worker is running a task, it is consider as an active task and this is consider as one count.
getCompletedTasks
get completed tasks were actually a wrapper to ThreadPoolExecutor.getCompletedTaskCount(). Whenever a worker is finished executed a task, this is consider one count.
getTotalBlockedTasks
when DebuggableThreadPoolExecutor object was initialized, a rejected execution handler is set. Whenever within ThreadPoolExecutor reject a command, rejectedExecution() is trigger and executed. So this translate to one reject is equivalent as one count.
That's about it for this article. When I study into this code and write this article, I get amazed on how this code is structured and it is complex. I would really recommend into study ThreadPoolExecutor.java as cassandra stage reference this code throughout.
Last but not least, if you are happy reading this and learn something, please remember to donate too.