Sunday, May 25, 2014

Learning continous integration with jenkins

All these while, I have been testing using junit test, manual testings, write script to test, or even trigger test from maven, but as many mentioned, even my buddy recommend me to look into jenkins, a continous integration for software development. Today, we are going to look into it.

So what is jenkins?

Jenkins is an open source continuous integration tool written in Java. The project was forked from Hudson after a dispute with Oracle. Jenkins provides continuous integration services for software development. It is a server-based system running in a servlet container such as Apache Tomcat. It supports SCM tools including AccuRev, CVS, Subversion, Git, Mercurial, Perforce, Clearcase and RTC, and can execute Apache Ant and Apache Maven based projects as well as arbitrary shell scripts and Windows batch commands. The primary developer of Jenkins is Kohsuke Kawaguchi.[2] Released under the MIT License, Jenkins is free software.[3]

Before we continue, let's understand the terminologies of jenkins. Below is the term which you will come across when you started with jenkins but for full list, please check out the link.

JobA runnable task that is controlled / monitored by Jenkins
Completed BuildA build is completed, if it was started and finished with any result, including failed builds.
Successful buildA build is successful when the compilation reported no errors.
NodeRepresents the physical computer running

I guess the simplest way to get yourself into jenkins is just with the command
$ java -jar jenkins.war

from terminal. However, you can also deploy within a servlet container like apache tomcat. Once started, just point your browser to jenkins with url localhost:8080. If default port 8080 is not available, you can specify --httpPort and you can find out other parameters using --help

You can also install jenkins via distribution , e.g.
rpm based distribution
deb based distribution

This article continue with the simple setup. So jenkins stores its files in $HOME/.jenkins. Right now we will create a simple project.

1. click on 'New Item' on the left navigation button.
2. add your project name. example videoOnCloud
3. select 'Build a free-style software project'
4. configure the project. See the attachment.

As a start, let's not configure any version controls system but just a simple script. We wanna learn how jenkins perform its duty. As seen here, there are a few variable configured and if you click on the link 'See the list of available environment variables', it should explain the parameters that I have configured. Go to the landing page and click build now, you should get a blue circle! Of cause, you can configure the build process using maven, ant or windows batch commands. You can also trigger remote build by executing http get to the link http://localhost:8080/job/videoOnCloud/build

So that's it. You should now have very basic using jenkins, try enable version controls system in your build configuration.

Saturday, May 24, 2014

Load balancing policy in datastax java driver

Today we are going to explore LoadBalancingPolicy in datastax java driver for apache cassandra.

So what is load balancing policy in datastax java driver? From code description :

The policy that decides which Cassandra hosts to contact for each new query.

Two methods need to be implemented:

  • LoadBalancingPolicy.distance : returns the "distance" of an host for that balancing policy.

  • LoadBalancingPolicy.newQueryPlan: it is used for each query to find which host to query first, and which hosts to use as failover.

The LoadBalancingPolicy is a com.datastax.driver.core.Host.StateListener and is thus informed of hosts up/down events. For efficiency purposes, the policy is expected to exclude down hosts from query plans.

The default policy for java driver version 2.0.2, is TokenAwarePolicy() and with child policy DCAwareRoundRobinPolicy().

Below are a list of policies available in this version of driver.


This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the next one (following the round-robin order) is tried, until all hosts have been tried. This policy is not datacenter aware and will include every known Cassandra host in its round robin algorithm. If you use multiple datacenter this will be inefficient and you will want to use the DCAwareRoundRobinPolicy load balancing policy instead.


This policy provides round-robin queries over the node of the local data center. It also includes in the query plans returned a configurable number of hosts in the remote data centers, but those are always tried after the local nodes. In other words, this policy guarantees that no host in a remote data center will be queried unless no host in the local data center can be reached.

If used with a single data center, this policy is equivalent to the RoundRobin policy, but its DC awareness incurs a slight overhead so the RoundRobin policy could be preferred to this policy in that case.


This policy encapsulates another policy. The resulting policy works in the following way:

  • the distance method is inherited from the child policy.

  • the iterator return by the newQueryPlan method will first return the LOCAL replicas for the query (based on Statement.getRoutingKey if possible (i.e. if the query getRoutingKey method doesn't return null and if Metadata.getReplicas returns a non empty set of replicas for that partition key). If no local replica can be either found or successfully contacted, the rest of the query plan will fallback to one of the child policy.

Do note that only replica for which the child policy distance method returns HostDistance.LOCAL will be considered having priority. For example, if you wrap DCAwareRoundRobinPolicy with this token aware policy, replicas from remote data centers may only be returned after all the host of the local data center.


A load balancing policy wrapper that ensure that only hosts from a provided white list will ever be returned.

This policy wraps another load balancing policy and will delegate the choice of hosts to the wrapped policy with the exception that only hosts contained in the white list provided when constructing this policy will ever be returned. Any host not in the while list will be considered IGNORED and thus will not be connected to.

This policy can be useful to ensure that the driver only connects to a predefined set of hosts. Keep in mind however that this policy defeats somewhat the host auto-detection of the driver. As such, this policy is only useful in a few special cases or for testing, but is not optimal in general. If all you want to do is limiting connections to hosts of the local data-center then you should use DCAwareRoundRobinPolicy and *not* this policy in particular.


A wrapper load balancing policy that adds latency awareness to a child policy.

When used, this policy will collect the latencies of the queries to each Cassandra node and maintain a per-node latency score (an average). Based on these scores, the policy will penalize (technically, it will ignore them unless no other nodes are up) the nodes that are slower than the best performing node by more than some configurable amount (the exclusion threshold).

The latency score for a given node is a based on a form of exponential moving average.
In other words, the latency score of a node is the average of its previously measured latencies, but where older measurements gets an exponentially decreasing weight. The exact weight applied to a newly received latency is based on the time elapsed since the previous measure (to account for the fact that latencies are not necessarily reported with equal regularity, neither over time nor between different nodes).

Once a node is excluded from query plans (because its averaged latency grew over the exclusion threshold), its latency score will not be updated anymore (since it is not queried). To give a chance to this node to recover, the policy has a configurable retry period. The policy will not penalize a host for which no measurement has been collected for more than this retry period.


Of cause, not a single load balancing is perfect for one environment and thus you should evaluate the load balancing policy that suit your needs. Because of this, load balancing will be fine tune or more will be added in the future, so always check back in the next release for newly update driver.

Friday, May 23, 2014

Learning git remote

Hello everybody! Today, we will take a look into git remote. So why git remote? Ever wonder why everytime when you push, you only have one command to push to? What if you want to push to a few servers? But before we push into a few servers, let's take a look what is git remote actually?

From git remote documentation

git-remote - Manage set of tracked repositories

So let's explain using examples. Below is my git repository, we check what we have in our current project.
$ git remote -v
origin (fetch)
origin (push)

Okay, so we have a remote repository named origin and its url for fetch and push, all clear, we are tracing the remote repository on github. As you may notice, origin didn't explain much though, other than said, oh yea, this is where it begin. What if you want to use a more descriptive term?
$ git remote rename origin github
$ git remote -v
github (fetch)
github (push)

okay, so now it is very descriptive, our remote repository is github. Now, what if I would like to push to another remote server? what then? Can I do it?
$ git remote add production
$ git remote -v
github (fetch)
github (push)
production (fetch)
production (push)

That's looks pretty easy. But over time, you may forget where is the branches set to.
$ git branch -r

So it is currently pointing to github/master. If you want to remove local branches which in  remote branches has been removed, you can use git remote prune. Note that, following --dry-run won't actually remove but just show you which is going to be removed. If you are sure, just remove the parameter --dry-run.
$ git remote prune --dry-run github

To fetch updates for a named set of remote in the repository, execute using remote update
$ git remote -v update github
Fetching github
= [up to date] master -> github/master

To change a push url for a remote repository, and without --push, fetch url is changed.
$ git remote set-url --push production
$ git remote -v
github (fetch)
github (push)
production (fetch)
production (push)

$ git remote set-url production
$ git remote -v
github (fetch)
github (push)
production (fetch)
production (push)

Though you can add more url for remote repositories, you can use set-url --add. If you notice, it won't be showing but you can check in .git/config to look at the url.
$ git remote set-url --add production
$ git remote -v
github (fetch)
github (push)
production (fetch)
production (push)
$ git remote set-url --delete production
$ git remote -v
github (fetch)
github (push)
production (fetch)
production (push)

Okay, because I don't host any remote production server, I will remove it.
$ git remote remove production
$ git remote -v
github (fetch)
github (push)


Sunday, May 18, 2014

Learning java native keyword

Today when I study into java code, the code caught my attention. Snippet below
* int openat(int dfd, const char* path, int oflag, mode_t mode)
static int openat(int dfd, byte[] path, int flags, int mode) throws UnixException {
NativeBuffer buffer = NativeBuffers.asNativeBuffer(path);
try {
return openat0(dfd, buffer.address(), flags, mode);
} finally {
private static native int openat0(int dfd, long pathAddress, int flags, int mode) throws UnixException;

So what happened actually? The static method openat is declared with package modifier which eventually do conversion before native private method openat0 is called. Note that the private method openat0 is actually a non java code execution.

native : Used in method declarations to specify that the method is not implemented in the same Java source file, but rather in another language.

Because it is executed in non java code, if you called native code, the associated method must be implemented in non java language, you can see example how is it done here with hello world example.

But with the native openat0, it came with the precompiled soname file, depending on which jvm you are using, to illustrate using this example. In the directory /usr/lib/jvm/jdk1.7.0_55/jre/lib/amd64/
$ ls /usr/lib/jvm/jdk1.7.0_55/jre/lib/amd64/
jli server
jvm.cfg xawt

As you may notice, there are many precompiled soname files come with jre. To check which soname loaded,
static {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
return null;
int flags = init();

hasAtSysCalls = (flags & HAS_AT_SYSCALLS) > 0;

obviously is loaded, and to read the content of the soname file,
$ objdump -T /usr/lib/jvm/jdk1.7.0_55/jre/lib/amd64/ | grep openat
000000000000c350 g DF .text 00000000000000ac SUNWprivate_1.1 Java_sun_nio_fs_UnixNativeDispatcher_openat0
$ nm -D /usr/lib/jvm/jdk1.7.0_55/jre/lib/amd64/ | grep openat0
000000000000c350 T Java_sun_nio_fs_UnixNativeDispatcher_openat0

So that's it, I hope you learned java native keyword too!

Saturday, May 17, 2014

Getting familiar with Java FileChannel

When I was studying into lucene 4.8.0 codebase, one particular code that stumble upon was the use of FileChannel. So today, I'm spending time to play around this class FileChannel.

So you would ask, why use FileChannel instead of BufferedWriter?

From the FileChannel documentation:

In addition to the familiar read, write, and close operations of byte channels, this class defines the following file-specific operations:

Bytes may be read or written at an absolute position in a file in a way that does not affect the channel's current position.

A region of a file may be mapped directly into memory; for large files this is often much more efficient than invoking the usual read or write methods.

Updates made to a file may be forced out to the underlying storage device, ensuring that data are not lost in the event of a system crash.

Bytes can be transferred from a file to some other channel, and vice versa, in a way that can be optimized by many operating systems into a very fast transfer directly to or from the filesystem cache.

A region of a file may be locked against access by other programs.

That sounds interesting, and to understand better, we will start to write code using class FileChannel. Below is one I wrote and explanation come after.
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;

public class FileChannelTest {

public static void main(String[] args) throws IOException {

try {
File aFile = new File("test.txt");
FileChannel fc =, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
System.out.println("initialized is Open " + fc.isOpen()); // true

String data = "hello orld";
ByteBuffer buf = ByteBuffer.allocate(data.length());

System.out.println("initial size " + fc.size()); // 0
System.out.println("initial position " + fc.position()); // 0
System.out.println("after write size " + fc.size()); // 10
System.out.println("after write position " + fc.position()); //10

ByteBuffer dst = ByteBuffer.allocate(200);, 0);
System.out.println("initial read " + new String(dst.array(), "UTF-8")); // hello orld

ByteBuffer newData = ByteBuffer.wrap("world\ndelete me".getBytes());
fc.write(newData, 6);

dst.clear();, 0);
System.out.println("read second write " + new String(dst.array(), "UTF-8")); //hello world
//delete me
System.out.println("after second write size " + fc.size()); // 21
System.out.println("after second write pos " + fc.position()); // 21

System.out.println("after truncate size " + fc.size()); // 12
System.out.println("after truncate pos " + fc.position()); // 12
dst = ByteBuffer.allocate(200);, 0);
System.out.println("after truncate " + new String(dst.array(), "UTF-8"));

newData = ByteBuffer.wrap("a new line of text\n".getBytes());
System.out.println("after second write size " + fc.size()); // 31
System.out.println("after second write pos " + fc.position()); // 31


System.out.println("after close " + fc.isOpen()); // false

} catch (IOException e) {
// TODO Auto-generated catch block



It's a simple single threaded class. So we start with a File object with a test file. Then we create FileChannel object where we create test.txt if is not exists, then write and read it.

To dip our toe into the water, we start by checking if the file Channel is open. In order to write, we need ByteBuffer. We construct a new ByteBuffer object with the data length and write the data into the buffer array. In order for the channel to write this ByteBuffer, you must call the method flip().

We checked what is the current FileChannel size and position. Initially, they are zero. After data is written, note that size and position has been increased to 10. So in order to check the file channel written, we can invoke the method read(), hence the next few statements in the code.

To hold the data read from the file channel object, we create a new ByteBuffer object called dst, with a capacity of 200, so we can fit 200 bytes of data. We read the file channel object starting from file position 0 into the dst object. As expected in the print out, hello orld is written into file channel object. Interesting! It means we can write based on position we want to specify and that is exactly the next few lines of code did, we write "world\ndelete me" using method write but with position specify. If you notice carefully, unlike method write(), write() did not update file channel position, hence we are setting it explicitly to advance to position 21, that is, last position of character in the file. With this second write, we change the original data (hello orld) to (hello world\ndelete me), this overwrite the existing string than appending.

We check the file channel object after second write, we corrected the typo in the string and the position and size is as expected (21). Now we truncate the file channel up to 12bytes. That is, we start to truncate from position 0 until 12, which the byte array containing "hello world\n" survive and the remaining will be truncated. so the remaining size is 12 and position is set to 12 as well. As verified from the print out, delete me is no longer exist in the file channel.

As the file position is set end of file, we can simulate append effect by just writing and that is exactly what the next lines of code did. To ensure data and metadata is flush to the block device, we called force with parameter true. We end this example with invoking close method and check after that file channel is no longer opened.

When file channel object is created, we added open option, StandardOpenOption.READ is because when read into the file channel, this bit has to be set or else you will get exception. That's it about learning FileChannel.

Friday, May 16, 2014

Learn and experiment with cassandra trigger

In cassandra 2.0, an experimental trigger was introduced and this seem exciting to bring cassandra into a whole new level. Today, by using cassandra 2.0.7 , we are going to learn cassandra trigger. But first, let's understand what conventional database trigger is.

Excerpt from wikipedia,

A database trigger is procedural code that is automatically executed in response to certain events on a particular table or view in a database. The trigger is mostly used for maintaining the integrity of the information on the database. For example, when a new record (representing a new worker) is added to the employees table, new records should also be created in the tables of the taxes, vacations and salaries.

So let's create a table in cassandra and then create a trigger for the table. We will do these execution via cqlsh and the example we are going to follow available in this link. Below are the steps I have taken from studying into the example trigger code.

1. build cassandra jar files in cassandra base directory.
2. build trigger-example.jar from trigger example directory.
3. upload trigger-example.jar to cassandra node directory in /etc/cassandra/triggers
4. copy to cassandra node directory in /etc/cassandra/
5. make cassandra aware of this jar and properties file addition via nodetool reloadtriggers
nodetool -h localhost reloadtriggers
5. repeat step 3 and 4 for all the nodes in the cluster.
6. create column family invertedindex via cqlsh.
7. create column family standard1 via cqlsh.
8. create trigger via cqlsh CREATE TRIGGER test1 ON "Keyspace1"."Standard1" USING 'org.apache.cassandra.triggers.InvertedIndex';
note that you can also drop trigger via command drop trigger test1 on "Keyspace1"."Standard1"

So that exciting part comes, when I tried to insert, the response keep on complaining key may not be empty, it is strange that we does specify the user_id as our key but it keep on giving error. So what went wrong?
cqlsh:keyspace1> insert into standard1 (user_id, age) values (124, 11);
Bad Request: Key may not be empty

TRACE [Thrift:5] 2014-05-12 22:17:02,492 (line 153) Process org.apache.cassandra.cql3.statements.UpdateStatement@164b11c @CL.ONE
DEBUG [Thrift:5] 2014-05-12 22:17:02,493 (line 159) request complete
ERROR [Thrift:5] 2014-05-12 22:17:02,493 (line 219) Error occurred during processing of message.
java.lang.RuntimeException: Exception while creating trigger on CF with ID: d04577ab-ecc0-3f57-bb01-6febc9d27803
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(
at org.apache.cassandra.triggers.TriggerExecutor.execute(
at org.apache.cassandra.service.StorageProxy.mutateWithTriggers(
at org.apache.cassandra.cql3.statements.ModificationStatement.executeWithoutCondition(
at org.apache.cassandra.cql3.statements.ModificationStatement.execute(
at org.apache.cassandra.cql3.QueryProcessor.processStatement(
at org.apache.cassandra.cql3.QueryProcessor.process(
at org.apache.cassandra.thrift.CassandraServer.execute_cql3_query(
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(
at org.apache.thrift.ProcessFunction.process(
at org.apache.thrift.TBaseProcessor.process(
at org.apache.cassandra.thrift.CustomTThreadPoolServer$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$
Caused by: java.lang.NullPointerException
at org.apache.cassandra.db.RowMutation.addOrGet(
at org.apache.cassandra.db.RowMutation.addOrGet(
at org.apache.cassandra.db.RowMutation.addOrGet(
at org.apache.cassandra.db.RowMutation.add(
at org.apache.cassandra.db.RowMutation.add(
at org.apache.cassandra.triggers.InvertedIndex.augment(
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(
... 15 more
TRACE [Thrift:5] 2014-05-12 22:17:02,495 (line 74) ClientState removed for socket ad

So I decided to go into further, and I got it to works after spending hours. Changes below.

1. change to lower letters for
$ cat /etc/cassandra/

2. rebuild trigger-example.jar file with different augment method implementation and remember deploy this to every node in the cluster and execute command reloadtriggers using nodetool.
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.ArrayBackedSortedColumns;
import java.util.Collections;

public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
RowMutation rm = new RowMutation("keyspace1", key);
return Collections.singletonList(rm);

3. drop both column family and recreate again, below are the schema.
cqlsh:keyspace1> desc table invertedindex;

CREATE TABLE invertedindex (
k int,
v1 int,
v2 int,
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

cqlsh:keyspace1> desc table test_table;

CREATE TABLE test_table (
k int,
v1 int,
v2 int,
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

and now when insert again into the cf, voila, 999 was auto created and no more exception in the log or cqlsh output!
cqlsh:keyspace1> select * from test_table;

(0 rows)

cqlsh:keyspace1> insert into test_table (k, v1) values (0, 0);
cqlsh:keyspace1> select * from test_table;

k | v1 | v2
0 | 0 | 999

(1 rows)

Conclusion that we can draw is, since it is experimental, that means in the future, trigger is subject to many changes including API and chances that it could fail is higher ;-). It also need cassandra and java knowledge to build trigger at the mean time. Thus, you should not use this in production but that does not mean you cannot try this feature. In fact, cassandra would like to receive feedback  on the trigger to improve or make cassandra trigger production ready in the future.

That's it for this article, if you like, please go to the donation page to contribute back as funding will keep us continue to write in the future.

Sunday, May 11, 2014

Store video on cassandra and using hector streaming IO

Today, we are going to learn how to stream in and stream out using hector-client.  There are two classes implemented in hector-client which storing binary in chunk and reading binary in chunk. That's pretty neat! The two mentioned classes are

ChunkOutputStream storing binary as blog into cassandra.
ChunkInputStream read binary as blog from cassandra.

Below is a test case coded with the two classes to show how to store and read data using the two mentioned class.
import static org.junit.Assert.*;

import java.util.Arrays;

import me.prettyprint.cassandra.connection.HConnectionManager;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;

import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.KsDef;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class HectorStreamTest {

private Keyspace keyspace;
private ThriftCluster cassandraCluster;
private CassandraHostConfigurator cassandraHostConfigurator;
protected HConnectionManager connectionManager;
public static KeyspaceDefinition KEYSPACE_DEV;
public final static String KEYSPACE = "TestKeyspace";
public final static String BLOB_CF = "Blob";
public final static CfDef BLOB_CF_DEF = new CfDef(KEYSPACE, BLOB_CF);

public static void setUpBeforeClass() throws Exception {

public static void tearDownAfterClass() throws Exception {

public void setUp() throws Exception {
cassandraHostConfigurator = new CassandraHostConfigurator(
connectionManager = new HConnectionManager("just4fun",

KEYSPACE_DEV = new ThriftKsDef(new KsDef(KEYSPACE,
Arrays.asList(new CfDef[] { BLOB_CF_DEF })));
((ThriftKsDef) KEYSPACE_DEV).setReplicationFactor(1);
cassandraCluster = new ThriftCluster("just4fun",

keyspace = HFactory.createKeyspace(KEYSPACE, cassandraCluster);

cassandraCluster.addKeyspace(KEYSPACE_DEV, true);

public void tearDown() throws Exception {

public void testWriteAndReadStream() throws IOException {
byte[] value = "hello world, store and read binary as a chunk of blob in and from cassandra.".getBytes();

// write to cassandra.
ChunkOutputStream<String> out = new ChunkOutputStream<String>(keyspace, BLOB_CF, "row1", StringSerializer.get(), 2);

// read from cassandra.
ChunkInputStream<String> in = new ChunkInputStream<String>(keyspace, BLOB_CF, "row1", StringSerializer.get());
int i = -1;
int written = 0;

while ((i = != -1) {
assertSame(value[written++], (byte) i);
byte[] b = {(byte)i};
System.out.print(new String(b));



Keyspace TestKeyspace is created and table Blob is use to write and read the blob data. The main point is probably on the chunk size in ChunkOutputStream, it is set to 2 but you can give another even number to store the byte. Remember, each byte is represented by two hexadecimal characters, see cqlsh output below for more information. The test method testWriteAndReadStream() store the data in variable value using ChunkOutputStream.write() and remember to close it so that the data actually flush to cassandra or else it will stay in client code. To read from cassandra, is by specifing the row to ChunkInputStream and calling method read() which it will return the data in chunk. When the test is done, keyspace is removed.
cqlsh:TestKeyspace> select * from "Blob";

key | column1 | value
0x726f7731 | 0x0000000000000000 | 0x6800
0x726f7731 | 0x0000000000000001 | 0x6500
0x726f7731 | 0x0000000000000002 | 0x6c00
0x726f7731 | 0x0000000000000003 | 0x6c00
0x726f7731 | 0x0000000000000004 | 0x6f00
0x726f7731 | 0x0000000000000005 | 0x2000
0x726f7731 | 0x0000000000000006 | 0x7700
0x726f7731 | 0x0000000000000007 | 0x6f00
0x726f7731 | 0x0000000000000008 | 0x7200
0x726f7731 | 0x0000000000000009 | 0x6c00
0x726f7731 | 0x000000000000000a | 0x6400
0x726f7731 | 0x000000000000000b | 0x2c00
0x726f7731 | 0x000000000000000c | 0x2000
0x726f7731 | 0x000000000000000d | 0x7300
0x726f7731 | 0x000000000000000e | 0x7400
0x726f7731 | 0x000000000000000f | 0x6f00
0x726f7731 | 0x0000000000000010 | 0x7200
0x726f7731 | 0x0000000000000011 | 0x6500
0x726f7731 | 0x0000000000000012 | 0x2000
0x726f7731 | 0x0000000000000013 | 0x6100
0x726f7731 | 0x0000000000000014 | 0x6e00
0x726f7731 | 0x0000000000000015 | 0x6400
0x726f7731 | 0x0000000000000016 | 0x2000
0x726f7731 | 0x0000000000000017 | 0x7200
0x726f7731 | 0x0000000000000018 | 0x6500
0x726f7731 | 0x0000000000000019 | 0x6100
0x726f7731 | 0x000000000000001a | 0x6400
0x726f7731 | 0x000000000000001b | 0x2000
0x726f7731 | 0x000000000000001c | 0x6200
0x726f7731 | 0x000000000000001d | 0x6900
0x726f7731 | 0x000000000000001e | 0x6e00
0x726f7731 | 0x000000000000001f | 0x6100
0x726f7731 | 0x0000000000000020 | 0x7200
0x726f7731 | 0x0000000000000021 | 0x7900
0x726f7731 | 0x0000000000000022 | 0x2000
0x726f7731 | 0x0000000000000023 | 0x6100
0x726f7731 | 0x0000000000000024 | 0x7300
0x726f7731 | 0x0000000000000025 | 0x2000
0x726f7731 | 0x0000000000000026 | 0x6100
0x726f7731 | 0x0000000000000027 | 0x2000
0x726f7731 | 0x0000000000000028 | 0x6300
0x726f7731 | 0x0000000000000029 | 0x6800
0x726f7731 | 0x000000000000002a | 0x7500
0x726f7731 | 0x000000000000002b | 0x6e00
0x726f7731 | 0x000000000000002c | 0x6b00
0x726f7731 | 0x000000000000002d | 0x2000
0x726f7731 | 0x000000000000002e | 0x6f00
0x726f7731 | 0x000000000000002f | 0x6600
0x726f7731 | 0x0000000000000030 | 0x2000
0x726f7731 | 0x0000000000000031 | 0x6200
0x726f7731 | 0x0000000000000032 | 0x6c00
0x726f7731 | 0x0000000000000033 | 0x6f00
0x726f7731 | 0x0000000000000034 | 0x6200
0x726f7731 | 0x0000000000000035 | 0x2000
0x726f7731 | 0x0000000000000036 | 0x6900
0x726f7731 | 0x0000000000000037 | 0x6e00
0x726f7731 | 0x0000000000000038 | 0x2000
0x726f7731 | 0x0000000000000039 | 0x6100
0x726f7731 | 0x000000000000003a | 0x6e00
0x726f7731 | 0x000000000000003b | 0x6400
0x726f7731 | 0x000000000000003c | 0x2000
0x726f7731 | 0x000000000000003d | 0x6600
0x726f7731 | 0x000000000000003e | 0x7200
0x726f7731 | 0x000000000000003f | 0x6f00
0x726f7731 | 0x0000000000000040 | 0x6d00
0x726f7731 | 0x0000000000000041 | 0x2000
0x726f7731 | 0x0000000000000042 | 0x6300
0x726f7731 | 0x0000000000000043 | 0x6100
0x726f7731 | 0x0000000000000044 | 0x7300
0x726f7731 | 0x0000000000000045 | 0x7300
0x726f7731 | 0x0000000000000046 | 0x6100
0x726f7731 | 0x0000000000000047 | 0x6e00
0x726f7731 | 0x0000000000000048 | 0x6400
0x726f7731 | 0x0000000000000049 | 0x7200
0x726f7731 | 0x000000000000004a | 0x6100
0x726f7731 | 0x000000000000004b | 0x2e00

(76 rows)


As basic classes provided by hector-client is in the package, it's a good to have feature to have if you want to stream in and stream out content like audio or video. I have implemented something similar here. The concept is similar, write binary content to cassandra and reconstruct the binary data into the file again.

That's it, hope you like this.

Saturday, May 10, 2014

Understand cassandra read path by tracing in CQL

In our last article, we explored cassandra 2.0.7 write path and in this article, we will explore cassandra read path. We will again follow the same investigation method we used on write for read too. That is, we will trace the read path by turning on tracing in cqlsh.

Let's start by enabling tracing and consistency to all. Then issue statement select and start to dig into code. Below are the output of the commands executed in cqlsh and output in cassandra system.log
cqlsh:jw_schema1> consistency all;
Consistency level set to ALL.
cqlsh:jw_schema1> tracing on;
Now tracing requests.
cqlsh:jw_schema1> select * from users;

user_id | age | first | last | middle
4 | 10 | john30003 | smith | junior
3 | 10 | john30003 | smith | junior
5 | 10 | john30003 | smith | junior
2 | 10 | john30003 | smith | junior

(4 rows)

Tracing session: 66a845c0-d5f3-11e3-bd26-a322c40b8b81

activity | timestamp | source | source_elapsed
execute_cql3_query | 22:25:12,732 | <node3_ip> | 0
Message received from /<node3_ip> | 22:25:11,106 | <node2_ip> | 26
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:11,106 | <node2_ip> | 289
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 442
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 581
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 658
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 724
Scanned 4 rows and matched 4 | 22:25:11,106 | <node2_ip> | 760
Enqueuing response to /<node3_ip> | 22:25:11,106 | <node2_ip> | 785
Sending message to /<node3_ip> | 22:25:11,107 | <node2_ip> | 954
Message received from /<node3_ip> | 22:25:12,430 | <node1_ip> | 76
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:12,431 | <node1_ip> | 1054
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1250
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1399
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1537
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1718
Scanned 4 rows and matched 4 | 22:25:12,432 | <node1_ip> | 1777
Enqueuing response to /<node3_ip> | 22:25:12,432 | <node1_ip> | 1935
Sending message to /<node3_ip> | 22:25:12,433 | <node1_ip> | 2212
Parsing select * from users LIMIT 10000; | 22:25:12,732 | <node3_ip> | 148
Preparing statement | 22:25:12,732 | <node3_ip> | 259
Determining replicas to query | 22:25:12,733 | <node3_ip> | 941
Enqueuing request to /<node2_ip> | 22:25:12,738 | <node3_ip> | 5556
Enqueuing request to /<node1_ip> | 22:25:12,738 | <node3_ip> | 5645
Enqueuing request to <node3_hostname>/<node3_ip> | 22:25:12,738 | <node3_ip> | 5688
Sending message to /<node2_ip> | 22:25:12,738 | <node3_ip> | 5811
Sending message to / | 22:25:12,738 | <node3_ip> | 5817
Sending message to /<node1_ip> | 22:25:12,738 | <node3_ip> | 6133
Message received from /<node3_ip> | 22:25:12,739 | <node3_ip> | 6558
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:12,740 | <node3_ip> | 7294
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 7506
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 7698
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 8222
Read 1 live and 0 tombstoned cells | 22:25:12,741 | <node3_ip> | 8570
Scanned 4 rows and matched 4 | 22:25:12,741 | <node3_ip> | 8634
Enqueuing response to /<node3_ip> | 22:25:12,741 | <node3_ip> | 8689
Sending message to / | 22:25:12,741 | <node3_ip> | 8821
Message received from /<node3_ip> | 22:25:12,742 | <node3_ip> | null
Processing response from /<node3_ip> | 22:25:12,742 | <node3_ip> | null
Message received from /<node1_ip> | 22:25:13,029 | <node3_ip> | null
Processing response from /<node1_ip> | 22:25:13,029 | <node3_ip> | null
Message received from /<node2_ip> | 22:25:13,061 | <node3_ip> | null
Processing response from /<node2_ip> | 22:25:13,061 | <node3_ip> | null
Read 5 live and 0 tombstoned cells | 22:25:13,086 | <node3_ip> | 353631
Read 5 live and 0 tombstoned cells | 22:25:13,087 | <node3_ip> | 355232
Read 5 live and 0 tombstoned cells | 22:25:13,093 | <node3_ip> | 360675
Read 5 live and 0 tombstoned cells | 22:25:13,093 | <node3_ip> | 360908
Request complete | 22:25:13,093 | <node3_ip> | 361266


TRACE [Thrift:186] 2014-05-07 22:25:12,733 (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@43e049 @CL.ALL
DEBUG [Thrift:186] 2014-05-07 22:25:13,604 (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-07 22:25:13,605 (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@17100f1 @CL.ONE
DEBUG [Thrift:186] 2014-05-07 22:25:13,911 (line 159) request complete
DEBUG [Thrift:186] 2014-05-07 22:25:13,915 (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-07 22:25:13,916 (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@d34f6 @CL.ONE
DEBUG [Thrift:186] 2014-05-07 22:25:14,227 (line 159) request complete

As write entry path is execute_cql3_query, so is read path. If you trace the code down, it will be too much to even start the discussion. I summarize the points below in tandem with the output of cqlsh tracing and system.log where applicable. Thus it may not be complete but I will give you the link as narration goes so that you can study yourself in detail.

It started at CassandraServer.execute_cql3_query(...)  as indicated in cqlsh tracing output. So basically the work done can be summarize by this line:
cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();

If you step into the code above, QueryProcessor.process(...)  which implement the interface QueryHandler  which get a valid CQLStatement. The execution continue by calling method QueryProcessor.processStatement(...). Notice that the logger in this method is shown in cassandra system.log (of cause you need to enable tracing for this class in in order for this line to log successfully). So access checking and validation are perform here. When checking and validation were done, then CQLStatement.execute(...) is executed.

Because we are executing select statement, the correspond class that implement interface CQLStatement is SelectStatement.  Extract from SelectStatement.execute(...)
public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
ConsistencyLevel cl = options.getConsistency();
List<ByteBuffer> variables = options.getValues();
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");


int limit = getLimit(variables);
long now = System.currentTimeMillis();
Pageable command;
if (isKeyRange || usesSecondaryIndexing)
command = getRangeCommand(variables, limit, now);
List<ReadCommand> commands = getSliceCommands(variables, limit, now);
command = commands == null ? null : new Pageable.ReadCommands(commands);

int pageSize = options.getPageSize();
// A count query will never be paged for the user, but we always page it internally to avoid OOM.
// If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
// Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
if (parameters.isCount && pageSize <= 0 && MessagingService.instance().allNodesAtLeast20)

if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
return execute(command, cl, variables, limit, now);
QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState());
if (parameters.isCount)
return pageCountQuery(pager, variables, pageSize, now);

// We can't properly do post-query ordering if we page (see #6722)
if (needsPostQueryOrdering())
throw new InvalidRequestException("Cannot page queries with both ORDER BY and a IN restriction on the partition key; you must either remove the "
+ "ORDER BY or the IN and sort client side, or disable paging for this query");

List<Row> page = pager.fetchPage(pageSize);
ResultMessage.Rows msg = processResults(page, variables, limit, now);
if (!pager.isExhausted())
return msg;

The execution continue to get the Pageable Command . Execution continue to private method execute(...). Then method getRangeSlice(...) is called. This is the actual work done to retrieve all the rows. This method implementation does a lot of works and I would recommend you click on the link and study the code yourself to get a better picture.

When the control is returned, the rows are sent for further processing using method processResults(...)  which eventually return the result back to the cassandra client.

As you may have notice, the upper layer execution is similar as to write path execution, until control passed to CQLStatement. That's it for this article, I hope you like it.

Friday, May 9, 2014

Understand cassandra write path by tracing in CQL

In this article, we will learn the write path for cassandra 2.0.7. Since cql is the way moving forward, we will start learning write path by focusing on cqlsh. Let's turn on the tracing, consistency to all and insert one row of data. Read output below:
cqlsh:jw_schema1> tracing on;
Now tracing requests.
cqlsh:jw_schema1> consistency all;
Consistency level set to ALL.

cqlsh:jw_schema1> insert into users (user_id, age, first, last, middle) values ('1', 10, 'john30003', 'smith', 'junior');

Tracing session: 03477650-d43f-11e3-bd26-a322c40b8b81

activity | timestamp | source | source_elapsed
execute_cql3_query | 18:21:25,430 | <node1_ip> | 0
Message received from /<node1_ip> | 18:21:23,795 | <node2_ip> | 52
Acquiring switchLock read lock | 18:21:23,795 | <node2_ip> | 455
Appending to commitlog | 18:21:23,795 | <node2_ip> | 497
Adding to users memtable | 18:21:23,795 | <node2_ip> | 613
Enqueuing response to /<node1_ip> | 18:21:23,800 | <node2_ip> | 5520
Sending message to /<node1_ip> | 18:21:23,801 | <node2_ip> | 6359
Message received from /<node1_ip> | 18:21:25,121 | <node3_ip> | 84
Acquiring switchLock read lock | 18:21:25,123 | <node3_ip> | 1777
Appending to commitlog | 18:21:25,123 | <node3_ip> | 1826
Adding to users memtable | 18:21:25,123 | <node3_ip> | 2121
Enqueuing response to /<node1_ip> | 18:21:25,129 | <node3_ip> | 8278
Sending message to /<node1_ip> | 18:21:25,129 | <node3_ip> | 8563
Parsing insert into users (user_id, age, first, last, middle) values ('1', 10, 'john30003', 'smith', 'junior'); | 18:21:25,430 | <node1_ip> | 93
Preparing statement | 18:21:25,430 | <node1_ip> | 227
Determining replicas for mutation | 18:21:25,433 | <node1_ip> | 2721
Sending message to /<node2_ip> | 18:21:25,433 | <node1_ip> | 3525
Sending message to /<node3_ip> | 18:21:25,434 | <node1_ip> | 3751
Acquiring switchLock read lock | 18:21:25,434 | <node1_ip> | 3963
Appending to commitlog | 18:21:25,434 | <node1_ip> | 3992
Adding to users memtable | 18:21:25,434 | <node1_ip> | 4067
Message received from /<node3_ip> | 18:21:25,730 | <node1_ip> | 300016
Processing response from /<node3_ip> | 18:21:25,730 | <node1_ip> | 300178
Message received from /<node2_ip> | 18:21:25,738 | <node1_ip> | 308225
Processing response from /<node2_ip> | 18:21:25,738 | <node1_ip> | 308676
Request complete | 18:21:25,738 | <node1_ip> | 308825

TRACE [Thrift:186] 2014-05-05 18:24:33,825 (line 153) Process org.apache.cassandra.cql3.statements.UpdateStatement@17d2390 @CL.ALL
DEBUG [Thrift:186] 2014-05-05 18:24:34,621 (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-05 18:24:34,622 (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@159d495 @CL.ONE
DEBUG [Thrift:186] 2014-05-05 18:24:34,623 (line 159) request complete
DEBUG [Thrift:186] 2014-05-05 18:24:34,626 (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-05 18:24:34,626 (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@75219b @CL.ONE
DEBUG [Thrift:186] 2014-05-05 18:24:34,629 (line 159) request complete

If you noticed, the entry path will be execute_cql3_query no matter write or read. If you trace the code down, it will be too much to even start the discussion. I summarize the points below in tandem with the output of cqlsh tracing and system.log where applicable. Thus it may not be complete but I will give you the link to the code as narration goes so that you can study yourself in detail.

It started at CassandraServer.execute_cql3_query(...)  as indicated in cqlsh tracing output. So basically the work done can be summarize by this line:
cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();

If you step into the line above, QueryProcessor.process(...) which implement the interface QueryHandler which get a valid CQLStatement. The execution continue by calling method QueryProcessor.processStatement(...). Notice that the logger in this method is shown in cassandra system.log (of cause you need to enable tracing for this class in in order for this line to log successfully). So access checking and validation are perform here. When checking and validation were done, then CQLStatement.execute(...) is executed. Because we are adding a new row by inserting a new row of data, the correspond class that implement interface CQLStatement is ModificationStatement.  Extract from ModificationStatement.execute(...)
public ResultMessage execute(QueryState queryState, QueryOptions options)
throws RequestExecutionException, RequestValidationException
if (options.getConsistency() == null)
throw new InvalidRequestException("Invalid empty consistency level");

if (hasConditions() && options.getProtocolVersion() == 1)
throw new InvalidRequestException("Conditional updates are not supported by the protocol version in use. You need to upgrade to a driver using the native protocol v2.");

return hasConditions()
? executeWithCondition(queryState, options)
: executeWithoutCondition(queryState, options);

The execution continue to the method ModificationStatement.executeWithoutCondition(...)  as our insert statement does not contain if not exists. Method getMutations(...) return a collection of mutations to be perform.

The collections of mutation is pass to StorageProxy.mutateWithTriggers(...) for further processing. This column family does not have trigger, so the execution continue to method StorageProxy.mutate() . The description of this method is informative, it write:

Use this method to have these Mutations applied across all replicas. This method will take care of the possibility of a replica being down and hint the data across to some other replica.

So this method basically does saving of data by applying to all replicas. If you trace along this path, you should notice the cqlsh tracing debug output appear along the way.

That's it for this article, for my next article, we will trace for cassandra read path. Thank you.

Monday, May 5, 2014

Investigate into nodetool cleanup in cassandra

In this article, we are going to study into cassandra 1.0.8 nodetool cleanup. From nodetool help description; Run cleanup on one or more column family. That's clearly too general to understand clearly.

So right now we will trace the code to understand cleanup operation.

Cleanup is actually a type of compaction. Trace the code down under, CompactionManager.doCleanupCompaction() which actually does cleanup actual work.
* This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
* @throws IOException
private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, NodeId.OneShotRenewer renewer) throws IOException
assert !cfs.isIndex();
Table table = cfs.table;
Collection<Range> ranges = StorageService.instance.getLocalRanges(;
boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
if (ranges.isEmpty())
{"Cleanup cannot run before a node has joined the ring");

for (SSTableReader sstable : sstables)
CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs), false);
long startTime = System.currentTimeMillis();

long totalkeysWritten = 0;

int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);

SSTableWriter writer = null;
SSTableReader newSstable = null;"Cleaning up " + sstable);
// Calculate the expected compacted filesize
long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
if (compactionFileLocation == null)
throw new IOException("disk full");

SSTableScanner scanner = sstable.getDirectScanner();
Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
List<IColumn> indexedColumnsInRow = null;

CleanupInfo ci = new CleanupInfo(sstable, scanner);
while (scanner.hasNext())
SSTableIdentityIterator row = (SSTableIdentityIterator);
if (Range.isTokenInRanges(row.getKey().token, ranges))
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
if (compactedRow.isEmpty())
writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));

if (!indexedColumns.isEmpty() || isCommutative)
if (indexedColumnsInRow != null)

while (row.hasNext())
IColumn column =;
if (column instanceof CounterColumn)
renewer.maybeRenew((CounterColumn) column);
if (indexedColumns.contains(
if (indexedColumnsInRow == null)
indexedColumnsInRow = new ArrayList<IColumn>();


if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
// acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
catch (Exception e)
if (writer != null)
throw FBUtilities.unchecked(e);

List<SSTableReader> results = new ArrayList<SSTableReader>();
if (newSstable != null)

String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = System.currentTimeMillis() - startTime;
long startsize = sstable.onDiskLength();
long endsize = newSstable.onDiskLength();
double ratio = (double)endsize / (double)startsize;, writer.getFilename(), startsize, endsize, (int)(ratio*100), totalkeysWritten, dTime));

// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd

cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);

With these method, it is very clear to us that, keys (that is the row.) that do not belong to this node will get removed. Following are points of summarization of what this method is actually done.

  • for this column family, get the range which this node is responsible for.

  • expectedRangeFileSize is half of summarization of all the input sstables file size.

  • for each sstables, a loop is done with the following tasks:

  1. check if enough disk size for the new compacted sstable.

  2. executor begin a cleanup compaction.

  3. iterate over the row in this sstable and check if the row key token is within the range this node is responsible for.

  4. if it is, get the compacted row and append this row to the SSTableWriter.

  5. if it is not, then the row will be invalidate in cache. The index created for this row will also be remove.

  • cleanup compaction is done by executor.

  • cleanup compaction infomation write to the logger.

  • flush index to disk.

  • old sstable is removed.

That's it about cassandra cleanup. If you learn something and would like to contribute back, please go to the donation page for more information.

Saturday, May 3, 2014

what and why always all time blocked for cassandra pool FlushWriter

FlushWriter                       0         0            941         0                53

If you noticed in a cassandra cluster, I often noticed that the pool FlushWriter all time block always increased while other pool remain 0. So is this that we should concern of?

Snippet from class ColumnFamilyStore:
* maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
* we turn the writer into an SSTableReader and add it to ssTables_ where it is available for reads.
* There are two other things that maybeSwitchMemtable does.
* First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
* and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater
* that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes
* to happen simultaneously on multicore systems, while still calling onMF in the correct order,
* which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
* called, all data up to the given context has been persisted to SSTables.
private static final ExecutorService flushWriter
= new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
new NamedThreadFactory("FlushWriter"),

Just like other Stage.replicate_on_write, FlushWriter is also an instance of JMXEnabledThreadPoolExecutor, governed by two configuration which you can altered in cassandra.yaml.

  • memtable_flush_writers default based on number of data_file_directories specified.

  • memtable_flush_queue_size default 4

Whenever maybeSwitchMemtable is called, memtable.flushAndSignal() is called within.

Notice that in Memtable.flushAndSignal(), ExecutorService which is extends a few until the construction object JMXEnabledThreadPoolExecutor for pool FlushWriter aforementioned.  So whenever, the task is rejected due to queue full, method rejectedExecution() is triggered  which eventually increase the count by one.

So that's it, hope you get an idea what and why is the all time block for pool FlushWriter is increased, so it should give indication you should altered the parameter for the two configuration in cassandra.yaml file.

Last, if you learned something and would like to contribute back, please visit our donation page. Thank you.

Friday, May 2, 2014

How often is cassandra minor compaction running and what trigger it

There are two types of compactions in cassandra. The minor compaction and the major compaction. Today, we are going to look into minor compaction and to understand when is minor compaction kickstarted.

Following are description snippet when you create column family using cassandra-cli.
- max_compaction_threshold: The maximum number of SSTables allowed before a
minor compaction is forced. Default is 32, setting to 0 disables minor

Decreasing this will cause minor compactions to start more frequently and
be less intensive. The min_compaction_threshold and max_compaction_threshold
boundaries are the number of tables Cassandra attempts to merge together at

- min_compaction_threshold: The minimum number of SSTables needed
to start a minor compaction. Default is 4, setting to 0 disables minor

Increasing this will cause minor compactions to start less frequently and
be more intensive. The min_compaction_threshold and max_compaction_threshold
boundaries are the number of tables Cassandra attempts to merge together at

So minor compaction is trigger automatically by cassandra and major compaction is trigger manually via nodetool compact. But when and what exactly that trigger minor compaction? That's when we need to trace into the codebase.

Because compaction is performed on the column family, thus the minor compaction is trigger in the class ColumnFamilyStore. Two methods that will submit this object for compaction executor to perform the minor compaction, that is during

Depend on the compaction strategy chosen for the column family, the default SizeTieredCompactionStrategy which extends AbstractCompactionStrategy and in the super class, which started a single thread to perform this background compaction task. It seem that this optional single threaded task run every five minute.

When the mentioned two method trigger, the object ColumnFamilyStore will be submit to the background for the single thread to perform compaction.
* Call this whenever a compaction might be needed on the given columnfamily.
* It's okay to over-call (within reason) since the compactions are single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing phase.
public Future<Integer> submitBackground(final ColumnFamilyStore cfs)
Callable<Integer> callable = new Callable<Integer>()
public Integer call() throws IOException
if (!cfs.isValid())
return 0;

boolean taskExecuted = false;
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
for (AbstractCompactionTask task : tasks)
if (!task.markSSTablesForCompaction())

taskExecuted = true;
// newly created sstables might have made other compactions eligible
if (taskExecuted)
return 0;
return executor.submit(callable);

Notice that when method getBackgroundTasks is called in submitBackground(), the min_compaction_threshold and max_compaction_threshold which you set in the column family is called here to determine if condition min_compaction_threshold is met and max_compaction_threshold.

From the experience, I don't know why datastax does not recommend major compaction via nodetool, maybe because the I/O and heap usage spike and may impair the node request and response but for me, when the node load goes beyond like 500GB, then there maybe be some stale data left in the big sstables, so it might not be a really such a bad idea to kickstart major compaction if the stale data can be removed and bring down the node load.

Last but not least, if you learn something and would like to contribute back, please go to our donation page.