Friday, April 25, 2014

code dive into cassandra Stage.REPLICATE_ON_WRITE

If you are administrator of a cassandra cluster, sometime you may notice StatusLogger started to flood in cassandra system.log. Example below is the log snippet found in system.log. So what and why this happened? Let us read into the codes.
 INFO [ScheduledTasks:1] 2014-04-17 14:18:00,079 StatusLogger.java (line 65) ReplicateOnWriteStage            17        17         0

StatusLogger started to write about the node thread pools into cassandra system.log under two conditions:

These indications will give an idea that the node is under stress. As you have noticed from system.log, there are many stages involved and with this article, we are going to focus on the metric Stage.REPLICATE_ON_WRITE.

What is replicate on write stage? From the code description, Replicate every counter update from the leader to the follower replicas. Accepts the values true and false. Aside from the code description, we are going to understand this stage by studying into the code.

There are 11 stages involved. When CassandraDaemon class kickstarted, StageManager is called and stages were initialized. Of cause, Stage.REPLICATE_ON_WRITE is one of the stages. An JMXConfigurableThreadPoolExecutor object with configuration 32 threads and 60 seconds keep alive is initialized. When this happened, this object is also registered to MBean server.

Apparently replicate on write stage is only trigger by column family with type counter and the code snippet below is the only code that increment replicate on write metric.
private static Runnable counterWriteTask(final IMutation mutation,
final Collection<InetAddress> targets,
final IWriteResponseHandler responseHandler,
final String localDataCenter,
final ConsistencyLevel consistency_level)
{
return new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
assert mutation instanceof CounterMutation;
final CounterMutation cm = (CounterMutation) mutation;

// apply mutation
cm.apply();
responseHandler.response(null);

// then send to replicas, if any
targets.remove(FBUtilities.getBroadcastAddress());
if (cm.shouldReplicateOnWrite() && !targets.isEmpty())
{
// We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION stage
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ)
{
public void runMayThrow() throws IOException, TimeoutException
{
// send mutation to other replica
sendToHintedEndpoints(cm.makeReplicationMutation(), targets, responseHandler, localDataCenter, consistency_level);
}
});
}
}
};
}

Whenever ThreadPoolExecutor execute the object DroppableRunner, the task will be execute by a thread in the thread pool executor.

Interface IExecutorMBean exposed three metric:

  • getActiveCount

  • getCompletedTasks

  • getPendingTasks


and interface JMXEnabledThreadPoolExecutorMBean exposed two more metrics:

  • getTotalBlockedTasks

  • getCurrentlyBlockedTasks


StatusLogger.log exposed getActiveCount, getPendingTasks and getCurrentlyBlockedTasks, hence the three columns per stage in the system.log output.

getActiveCount
get active count is actually implemented within class ThreadPoolExecutor. Whenever a worker is running a task, it is consider as an active task and this is consider as one count.

getCompletedTasks
get completed tasks were actually a wrapper to ThreadPoolExecutor.getCompletedTaskCount(). Whenever a worker is finished executed a task, this is consider one count.

getTotalBlockedTasks
when DebuggableThreadPoolExecutor object was initialized, a rejected execution handler is set. Whenever within ThreadPoolExecutor reject a command, rejectedExecution() is trigger and executed. So this translate to one reject is equivalent as one count.

That's about it for this article. When I study into this code and write this article, I get amazed on how this code is structured and it is complex. I would really recommend into study ThreadPoolExecutor.java as cassandra stage reference this code throughout.

Last but not least, if you are happy reading this and learn something, please remember to donate too.

Monday, April 21, 2014

Enable or disable sstable compression?

In cassandra 2.0.6, there are a few compression for sstables, the default is LZ4Compressor. There are others such as DeflateCompressor, SnappyCompressor or
do not compress the sstables at all.

You can read more about compression at official documentation as found it here.

With this blog, I will create two scenarios where first scenario is with enable compression and another scenario is without compression. This is the only different for both scenarios.

So I have create 50 thousands insert statement with cql and then insert using by feeding to cqlsh. So first , the schema below with LZ4Compressor compression and leave value for key sstable_compression empty for no compression.
CREATE TABLE users (
user_id text,
age int,
first text,
last text,
middle text,
PRIMARY KEY (user_id)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='storing user data' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

CREATE INDEX idxAge ON users (age);

CREATE INDEX idxLast ON users (last);

jason@localhost:~$ wc -l data.cql
50000 data.cql
jason@localhost:~$ cqlsh 192.168.0.2 9160 -k jw_schema1 -f data.cql
jason@localhost:~$

so looks good, that we have total rows of 50 thousands.
cqlsh:jw_schema1> select count(*) from users limit 100000;

count
-------
50000

(1 rows)

cqlsh:jw_schema1>

Ran nodetool repair, flush, cleanup and then compact. With compression enable, the sstable count only 1 and the total filesize in this directory is about 4.5MB.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls -l
total 4576
-rw-r--r-- 1 cassandra cassandra 179 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-CompressionInfo.db
-rw-r--r-- 1 cassandra cassandra 599421 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 136 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 1800 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 68 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 179 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-CompressionInfo.db
-rw-r--r-- 1 cassandra cassandra 598579 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 16 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 680 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 971 Apr 15 21:02 jw_schema1-users-jb-1-CompressionInfo.db
-rw-r--r-- 1 cassandra cassandra 2387391 Apr 15 21:02 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 62512 Apr 15 21:02 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 938894 Apr 15 21:02 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4391 Apr 15 21:02 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 6615 Apr 15 21:02 jw_schema1-users-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:02 jw_schema1-users-jb-1-TOC.txt
drwxr-xr-x 2 cassandra cassandra 4096 Apr 15 20:57 snapshots
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

Right now without compression, the total file size is about 11MB. Noticed that, the size is almost double and the sstable count is two.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls -l
total 10860
-rw-r--r-- 1 cassandra cassandra 48 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-CRC.db
-rw-r--r-- 1 cassandra cassandra 687656 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 78 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 136 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 1800 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 68 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 32 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-CRC.db
-rw-r--r-- 1 cassandra cassandra 455238 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Data.db
-rw-r--r-- 1 cassandra cassandra 78 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 136 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Filter.db
-rw-r--r-- 1 cassandra cassandra 1800 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Index.db
-rw-r--r-- 1 cassandra cassandra 4393 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Statistics.db
-rw-r--r-- 1 cassandra cassandra 68 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-TOC.txt
-rw-r--r-- 1 cassandra cassandra 48 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-CRC.db
-rw-r--r-- 1 cassandra cassandra 685677 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 16 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 425 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 32 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-CRC.db
-rw-r--r-- 1 cassandra cassandra 453259 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Data.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 16 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Filter.db
-rw-r--r-- 1 cassandra cassandra 287 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Index.db
-rw-r--r-- 1 cassandra cassandra 4393 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Statistics.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-TOC.txt
-rw-r--r-- 1 cassandra cassandra 288 Apr 15 21:23 jw_schema1-users-jb-1-CRC.db
-rw-r--r-- 1 cassandra cassandra 4612770 Apr 15 21:23 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:23 jw_schema1-users-jb-1-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 37880 Apr 15 21:23 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 564480 Apr 15 21:23 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4391 Apr 15 21:23 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 3984 Apr 15 21:23 jw_schema1-users-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 192 Apr 15 21:24 jw_schema1-users-jb-2-CRC.db
-rw-r--r-- 1 cassandra cassandra 3015018 Apr 15 21:24 jw_schema1-users-jb-2-Data.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:24 jw_schema1-users-jb-2-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 24648 Apr 15 21:24 jw_schema1-users-jb-2-Filter.db
-rw-r--r-- 1 cassandra cassandra 374414 Apr 15 21:24 jw_schema1-users-jb-2-Index.db
-rw-r--r-- 1 cassandra cassandra 4391 Apr 15 21:24 jw_schema1-users-jb-2-Statistics.db
-rw-r--r-- 1 cassandra cassandra 2672 Apr 15 21:24 jw_schema1-users-jb-2-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users-jb-2-TOC.txt

With current hardware setup, which is loaded, with sstable compression enable, at times, the request get rpc timeout but at times, the result is returned. However without compression on sstable, all the requests executed get timeout. Below are the query perform via cqlsh.
cqlsh:jw_schema1> select * from users where age > 95 and last = 'smith' allow filtering;
Request did not complete within rpc_timeout.

Apparently enable compression does improve reading speed and saving disk size.

Saturday, April 19, 2014

Introduction to CRUD on cql 3.0 data type

In a previous article, we covered a basic data definition language,  and in this article, we are going to cover data manipulation language. With cql3, composite data type is pretty interesting compare to sql. Official documentation available here.

We covered all data type in cql 3.0 except counter and now we will create all the available data types that can coexists within a table. Let's do it.
CREATE TABLE dataType (
id uuid,
name ascii,
amount bigint,
binary blob,
isSingle boolean,
lamp decimal,
salary double,
works float,
ip inet,
car int,
email set<text>,
kidsAge map<text,int>,
places list<text>,
description text,
lastUpdate timestamp,
myTimeUUID timeuuid,
longDescription varchar,
spending varint,
PRIMARY KEY (id)
);

cqlsh:jw_schema1> desc table datatype;

CREATE TABLE datatype (
id uuid,
amount bigint,
binary blob,
car int,
description text,
email set<text>,
ip inet,
issingle boolean,
kidsage map<text, int>,
lamp decimal,
lastupdate timestamp,
longdescription text,
mytimeuuid timeuuid,
name ascii,
places list<text>,
salary double,
spending varint,
works float,
PRIMARY KEY (id)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

looks good, table created and let's continue by insert data into this table!
insert into datatype (id, name, amount, binary, issingle, lamp, salary, works, ip, car, email, kidsage, places, description, lastUpdate, myTimeUUID, longDescription, spending) values (62c36092-82a1-3a00-93d1-46196ee77204, 'jason wee', 123, 0xff, false, 10, 100000, 1, '192.168.0.1', 3, {'a@b.com', 'c@d.com'}, {'juniorA':1, 'juniorB':2}, ['kuala lumpur', 'petaling jaya', 'kepong'], 'hello world', '2014-04-15 00:00:00', maxTimeuuid('2014-04-15 00:05+0000'), 'this is a longer hello world', 123);

cqlsh:jw_schema1> select * from datatype;

id | amount | binary | car | description | email | ip | issingle | kidsage | lamp | lastupdate | longdescription | mytimeuuid | name | places | salary | spending | works
--------------------------------------+--------+--------+-----+-------------+------------------------+-------------+----------+------------------------------+------+--------------------------+------------------------------+--------------------------------------+-----------+---------------------------------------------+--------+----------+-------
62c36092-82a1-3a00-93d1-46196ee77204 | 123 | 0xff | 3 | hello world | {'a@b.com', 'c@d.com'} | 192.168.0.1 | False | {'juniorA': 1, 'juniorB': 2} | 10 | 2014-04-15 00:00:00+0800 | this is a longer hello world | 95fc050f-c431-11e3-7f7f-7f7f7f7f7f7f | jason wee | ['kuala lumpur', 'petaling jaya', 'kepong'] | 1e+05 | 123 | 1

(1 rows)

Goodies, all data were inserted. Let's try update, we start by single update to one column.
cqlsh:jw_schema1> update datatype set amount = 456 where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select amount from datatype where id = 62c36092-82a1-3a00-93d1-46196ee77204;

amount
--------
456

(1 rows)

Looks good too! Now we will update three fields.
cqlsh:jw_schema1> update datatype set binary = 0x68656c6c6f20776f726c64, car = 6, description = 'changed description' where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select binary, car, description from datatype where id = 62c36092-82a1-3a00-93d1-46196ee77204;

binary | car | description
--------------------------+-----+---------------------
0x68656c6c6f20776f726c64 | 6 | changed description

(1 rows)

cqlsh:jw_schema1>

Looks good! The binary data type always prefix with 0x. It is hex representation of hello world. Let's now change the composite data type.
cqlsh:jw_schema1> update datatype set email = {'e@f.com'} where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select email from datatype;

email
-------------
{'e@f.com'}

(1 rows)

Hmm... email field values get overridden. So how do we append it? concat them ! :-)
cqlsh:jw_schema1> update datatype set email = email + {'a@b.com', 'c@d.com'} where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select email from datatype;

email
-----------------------------------
{'a@b.com', 'c@d.com', 'e@f.com'}

(1 rows)

Move on, update data type boolean and IP number.
cqlsh:jw_schema1> update datatype set ip = 'a.b.c.d', issingle = True where id = 62c36092-82a1-3a00-93d1-46196ee77204;
Bad Request: unable to make inetaddress from 'a.b.c.d'
cqlsh:jw_schema1> update datatype set ip = '255.255.255.255', issingle = True where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> update datatype set ip = '255.255.255.255', issingle = Trued where id = 62c36092-82a1-3a00-93d1-46196ee77204;
Bad Request: line 1:61 no viable alternative at input 'where'
cqlsh:jw_schema1> update datatype set ip = '255.255.255.255', issingle = True where id = 62c36092-82a1-3a00-93d1-46196ee77204;

cqlsh:jw_schema1> select ip,issingle from datatype;

ip | issingle
-----------------+----------
255.255.255.255 | True

(1 rows)

Simple checking on IP number and boolean data type is enforce. Let's change map now.
cqlsh:jw_schema1> update datatype set kidsage = {'juniorC':3} where  id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select kidsage from datatype;

kidsage
----------------
{'juniorC': 3}

(1 rows)

cqlsh:jw_schema1> update datatype set kidsage = kidsage + {'juniorA':1, 'juniorB':2} where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select kidsage from datatype;

kidsage
--------------------------------------------
{'juniorA': 1, 'juniorB': 2, 'juniorC': 3}

(1 rows)

Exactly like set behavior, if you need to append the data, you need to concat them using plus sign.
cqlsh:jw_schema1> update datatype set lamp = 12.34, lastupdate = '2014-04-16 20:00', longdescription = 'this is a long long long hello world', mytimeuuid = maxTimeuuid('2014-04-16'), name = 'john smith' where  id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select lamp, lastupdate, longdescription, mytimeuuid, name from datatype;

lamp | lastupdate | longdescription | mytimeuuid | name
-------+--------------------------+--------------------------------------+--------------------------------------+------------
12.34 | 2014-04-16 20:00:00+0800 | this is a long long long hello world | ff72270f-c4b6-11e3-7f7f-7f7f7f7f7f7f | john smith

(1 rows)

cqlsh:jw_schema1>

Everything looks good, timestamp provided by hour and longer text and time uuid can accept only date, everything seem cool.
cqlsh:jw_schema1> update datatype set places = places + ['cheras'], salary = 985621.35, spending = 12355, works = 89.36 where  id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select places, salary, spending, works from datatype;

places | salary | spending | works
-------------------------------------------------------+------------+----------+-------
['kuala lumpur', 'petaling jaya', 'kepong', 'cheras'] | 9.8562e+05 | 12355 | 89.36

(1 rows)

cqlsh:jw_schema1>

Okay, we pretty all cover update all the data type. Let's remove this one row.
cqlsh:jw_schema1> delete amount,binary,car,description,email,ip,issingle,kidsage,lamp,lastupdate,longdescription,mytimeuuid from datatype where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select * from datatype;

id | amount | binary | car | description | email | ip | issingle | kidsage | lamp | lastupdate | longdescription | mytimeuuid | name | places | salary | spending | works
--------------------------------------+--------+--------+------+-------------+-------+------+----------+---------+------+------------+-----------------+------------+------------+-------------------------------------------------------+------------+----------+-------
62c36092-82a1-3a00-93d1-46196ee77204 | null | null | null | null | null | null | null | null | null | null | null | null | john smith | ['kuala lumpur', 'petaling jaya', 'kepong', 'cheras'] | 9.8562e+05 | 12355 | 89.36

(1 rows)

Pretty interesting, we can delete columns within a row. The data is set to null.
cqlsh:jw_schema1> delete from datatype where id = 62c36092-82a1-3a00-93d1-46196ee77204;
cqlsh:jw_schema1> select * from datatype;

(0 rows)

Now we have deleted everything.

  • looks like it is case insensitive, that, is when we created the table name, dataType it is stored as datatype.

  • the composite datatype is definitely nice to have as we don't have to link a few tables.

  • we can also delete a few columns or we can delete the entire row.


 

Friday, April 18, 2014

Should we use cache in application server interface to cassandra?

There are many debates whether should a cache layer should be place in front of cassandra, read more here.

Initially, if your data set store in cassandra is small, well , fine, enable key cache in cassandra or if you are sure enough, row cache perhaps

From application server development point of view, less code to develop and faster development cycle. This could also means one less test and less ambuiguity. Everyone is happy.

But imagine over times, when data store in cassandra grow and (read and write) requests to cassandra are also increase in tandem, perhaps some request will get timeout. Remember that when sstables grows, compaction and repair will take considerable amount of hardware resources (example memory and block device).

So in this situation, it will not be such a bad idea to have a cache layer on application server. To reduce read request to cassandra, one can implement a simple caching layer in the application server. Of cause, the design and requirement is specific to one's need but the general idea is that, by using a caching layer, request to the cassandra server can be reduce and if caching use correctly, the effect can be tremendous.

There is discussion on how this can be implemented.

Sunday, April 13, 2014

Research into cassandra nodetool cfhistograms and interpret statistics

What is nodetool cfhistogram?

According to the official documentation definition: The nodetool cfhistograms command provides statistics about a table, including read/write latency, row size, column count, and number of SSTables.

If you noticed the picture output below, it is entirely different than the cfhistogram output in cassandra 2.0.6 . Apparently output of cfhistograms is simplified and improved! You can find more information about this improvement here. To get the existing way of output, give −−compact to the nodetool as a parameter.



Okay, let's start by issue command nodetool cfhistograms to our cluster.
jason@localhost:~$ nodetool -h localhost cfhistograms jw_schema1 users
jw_schema1/users histograms

SSTables per Read
1 sstables: 997

Write Latency (microseconds)
No Data

Read Latency (microseconds)
103 us: 1
124 us: 15
149 us: 28
179 us: 131
215 us: 306
258 us: 373
310 us: 66
372 us: 17
446 us: 6
535 us: 21
642 us: 10
770 us: 2
924 us: 1
1109 us: 3
1331 us: 1
1597 us: 1
1916 us: 3
2299 us: 0
2759 us: 2
3311 us: 1
3973 us: 0
4768 us: 0
5722 us: 1
6866 us: 0
8239 us: 1
9887 us: 4
11864 us: 1
14237 us: 1
17084 us: 1

Partition Size (bytes)
149 bytes: 3

Cell Count per Partition
5 cells: 3

The statistics is a bit difficult to understand if you do not know what does it mean. Let's begin by studying into the cfhistograms codes.
private void printCfHistograms(String keySpace, String columnFamily, PrintStream output, boolean compactFormat)
{
ColumnFamilyStoreMBean store = this.probe.getCfsProxy(keySpace, columnFamily);

// default is 90 offsets
long[] offsets = new EstimatedHistogram().getBucketOffsets();

long[] rrlh = store.getRecentReadLatencyHistogramMicros();
long[] rwlh = store.getRecentWriteLatencyHistogramMicros();
long[] sprh = store.getRecentSSTablesPerReadHistogram();
long[] ersh = store.getEstimatedRowSizeHistogram();
long[] ecch = store.getEstimatedColumnCountHistogram();

output.println(String.format("%s/%s histograms", keySpace, columnFamily));
output.println("");

if (compactFormat)
{
output.println(String.format("%-10s%10s%18s%18s%18s%18s",
"Offset", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
output.println(String.format("%-10s%10s%18s%18s%18s%18s",
"", "", "(micros)", "(micros)", "(bytes)", ""));

for (int i = 0; i < offsets.length; i++)
{
output.println(String.format("%-10d%10s%18s%18s%18s%18s",
offsets[i],
(i < sprh.length ? sprh[i] : "0"),
(i < rwlh.length ? rwlh[i] : "0"),
(i < rrlh.length ? rrlh[i] : "0"),
(i < ersh.length ? ersh[i] : "0"),
(i < ecch.length ? ecch[i] : "0")));
}
}
else
{
output.println("SSTables per Read");
printHistogram(sprh, offsets, "sstables", output);

output.println("Write Latency (microseconds)");
printHistogram(rwlh, offsets, "us", output);

output.println("Read Latency (microseconds)");
printHistogram(rrlh, offsets, "us", output);

output.println("Partition Size (bytes)");
printHistogram(ersh, offsets, "bytes", output);

output.println("Cell Count per Partition");
printHistogram(ecch, offsets, "cells", output);
}
}

Essentially a proxy ColumnFamilyStoreMBean is made through jmx ($ jconsole service:jmx:rmi:///jndi/rmi://192.168.0.2:7199/jmxrmi also see picture below) based on the previous keyspace and column family specified in the nodetool parameter. The default bucket offset will always be 90. Thus if you carefully analyzed the row output of the compact statistics, you will noticed exactly 90 rows each time nodetool cfhistogram command is triggered.



You would ask, why would 90 bucket offsets? Well according to the codes documentation:
The series of values to which the counts in `buckets` correspond:
1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 14, 17, 20, etc.
Thus, a `buckets` of [0, 0, 1, 10] would mean we had seen one value of 3 and 10 values of 4.

The series starts at 1 and grows by 1.2 each time (rounding and removing duplicates). It goes from 1
to around 36M by default (creating 90+1 buckets), which will give us timing resolution from microseconds to
36 seconds, with less precision as the numbers get larger.

Each bucket represents values from (previous bucket offset, current offset].

Depending if parameter compact is specified, the output will be different. There are six metrics exposed. We will take a closer look.

  • offset | the bucket offset


Bucket offset from 149 (exclusive) to 179 (inclusive). Essentially this bucket offset contain latency from 149 microseconds until 179 microseconds.




  • SSTables | recent SSTables per read


With each read, total of sstables accessed accountable for. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.


This metric will increase if there is any call to CollationController.java or CacheService.java




  • Write Latency (micros) | recent write latency histogram in microseconds.


An array representing the latency histogram for write in microseconds. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.


This metric will increase if there is any call to ColumnFamilyStore.java, StorageProxy.java or WeightedQueue.java .




  • Read Latency (micros) | recent read latency histogram in Microseconds.


An array representing the latency histogram for read in microseconds. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.




  • Partition Size (bytes ) | estimated row size histogram


As estimation of row size in bytes. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will NOT reset.


The metric is collected by iterating over the sstables, and get the estimated row size in bytes.




  • Cell Count | estimated column count histogram


Estimated number of columns. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will NOT reset.


The metric is collected by iterating over the sstables, and get the estimated column count.


So with these interpretation from the codes, let's take another compact form cfhistogram to interpret the metrics. First, we will make start by make some statistics:
cqlsh:jw_schema1> select * from users where age > 5 and age < 50 and last = 'smith' allow filtering;

jason@localhost:~$ nodetool -h localhost cfhistograms jw_schema1 users -c
jw_schema1/users histograms

Offset SSTables Write Latency Read Latency Partition Size Cell Count
(micros) (micros) (bytes)
1 997 0 0 0 0
2 0 0 0 0 0
3 0 0 0 0 0
4 0 0 0 0 0
5 0 0 0 0 1000
6 0 0 0 0 0
7 0 0 0 0 0
8 0 0 0 0 0
10 0 0 0 0 0
12 0 0 0 0 0
14 0 0 0 0 0
17 0 0 0 0 0
20 0 0 0 0 0
24 0 0 0 0 0
29 0 0 0 0 0
35 0 0 0 0 0
42 0 0 0 0 0
50 0 0 0 0 0
60 0 0 0 0 0
72 0 0 0 0 0
86 0 0 0 0 0
103 0 0 0 0 0
124 0 0 0 0 0
149 0 0 0 999 0
179 0 0 0 1 0
215 0 0 0 0 0
258 0 0 0 0 0
310 0 0 0 0 0
372 0 0 0 0 0
446 0 0 0 0 0
535 0 0 0 0 0
642 0 0 0 0 0
770 0 0 0 0 0
924 0 0 0 0 0
1109 0 0 0 0 0
1331 0 0 51 0 0
1597 0 0 491 0 0
1916 0 0 95 0 0
2299 0 0 53 0 0
2759 0 0 84 0 0
3311 0 0 95 0 0
3973 0 0 41 0 0
4768 0 0 32 0 0
5722 0 0 25 0 0
6866 0 0 9 0 0
8239 0 0 7 0 0
9887 0 0 6 0 0
11864 0 0 4 0 0
14237 0 0 0 0 0
17084 0 0 2 0 0
20501 0 0 0 0 0
24601 0 0 0 0 0
29521 0 0 0 0 0
35425 0 0 0 0 0
42510 0 0 1 0 0
51012 0 0 0 0 0
61214 0 0 0 0 0
73457 0 0 0 0 0
88148 0 0 0 0 0
105778 0 0 1 0 0
126934 0 0 0 0 0
152321 0 0 0 0 0
182785 0 0 0 0 0
219342 0 0 0 0 0
263210 0 0 0 0 0
315852 0 0 0 0 0
379022 0 0 0 0 0
454826 0 0 0 0 0
545791 0 0 0 0 0
654949 0 0 0 0 0
785939 0 0 0 0 0
943127 0 0 0 0 0
1131752 0 0 0 0 0
1358102 0 0 0 0 0
1629722 0 0 0 0 0
1955666 0 0 0 0 0
2346799 0 0 0 0 0
2816159 0 0 0 0 0
3379391 0 0 0 0 0
4055269 0 0 0 0 0
4866323 0 0 0 0 0
5839588 0 0 0 0 0
7007506 0 0 0 0 0
8409007 0 0 0 0 0
10090808 0 0 0 0 0
12108970 0 0 0 0 0
14530764 0 0 0 0 0
17436917 0 0 0 0 0
20924300 0 0 0 0 0
25109160 0 0 0 0 0


  • There are 51 read requests spend time from 1109 microsecond to 1331 microsecond.

  • 997 sstables were read and spent time 1 microsecond.

  • Because this is a read operation, (cql select statement), there is no write latency involved.

  • The mean size for 999 partition is 149 bytes and another one is 179 bytes.

  • There are 1000 partition with 5 cells.


These metric is good for monitoring if you can poll periodically and plot them into graphs. Note that, those methods covered above, many had been deprecated in this cassandra version and probably in the coming cassandra, it will be removed and that they will have better way of depicting the metric. If you started on older cassandra version for example, pre-cassandra 1.1, the cell is correspond to column whilst partition is correspond to row.

Thank you.

Saturday, April 12, 2014

Learn and play with cassandra 2.0.6 snapshot and restore

Snapshot of cassandra appearing as early as cassandra version 0.4.0 beta. Today, we are going to learn on cassandra snapshot. Note that if you run snapshot on a node in a cluster, it only snapshot on that node. If you want to snapshot for all nodes in a cluster,

it is much more efficient to use a parallel ssh such as clusterssh or pssh.

Fundamentally, when snapshot is executed, it copy the sstables into a snapshot directory. So be notice that if you have a huge node load, it require two times the disk space of that server and it may spike the I/O activity on that node too if large amount of sstables is being snapshot.

Let's get down to the work.

First, ensure at least the table (column family) has data.
cqlsh:jw_schema1> select * from users;

user_id | age | first | last | middle
---------+-----+-------+----------+--------
3 | 34 | john | smith | a
2 | 35 | olee | smith | b
1 | 33 | dan | bar | c

(3 rows)

Then take a snapshot, for instance, I only take a snapshot of this keyspace, jw_schema1 and table users. What that does is that, cassandra will flush the data to sstable before snapshot is taken. For option such as giving a meaningful snapshot a name, check out command nodetool help.
jason@localhost:~$ nodetool -h localhost snapshot jw_schema1 -cf users
Requested creating snapshot for: jw_schema1 and table: users
Snapshot directory: 1397292720524

The snapshot made will be stored at <data_file_directories> that you set in cassandra.yaml file. So for instance,
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls -l snapshots/1397292720524/
total 96K
-rw-r--r-- 2 cassandra cassandra 16 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 2 cassandra cassandra 54 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 2 cassandra cassandra 76 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 2 cassandra cassandra 43 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-CompressionInfo.db
-rw-r--r-- 2 cassandra cassandra 4.3K Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 2 cassandra cassandra 79 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 2 cassandra cassandra 68 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 2 cassandra cassandra 16 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 2 cassandra cassandra 58 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 2 cassandra cassandra 87 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 2 cassandra cassandra 43 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-CompressionInfo.db
-rw-r--r-- 2 cassandra cassandra 4.3K Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 2 cassandra cassandra 79 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 2 cassandra cassandra 75 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 2 cassandra cassandra 16 Apr 12 16:52 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 2 cassandra cassandra 45 Apr 12 16:52 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 2 cassandra cassandra 206 Apr 12 16:52 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 2 cassandra cassandra 43 Apr 12 16:52 jw_schema1-users-jb-1-CompressionInfo.db
-rw-r--r-- 2 cassandra cassandra 4.3K Apr 12 16:52 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 2 cassandra cassandra 79 Apr 12 16:52 jw_schema1-users-jb-1-TOC.txt
-rw-r--r-- 2 cassandra cassandra 59 Apr 12 16:52 jw_schema1-users-jb-1-Summary.db

If you md5sum on the data files between snapshot and the live data, they are identically match.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ md5sum snapshots/1397292720524/*Data*
3d4351d714500417c74de6811b1eae3b snapshots/1397292720524/jw_schema1-users.idxAge-jb-1-Data.db
a430a2d65c0a504fe3ab06344654a89a snapshots/1397292720524/jw_schema1-users.idxLast-jb-1-Data.db
13798e1ffb5ed6a871d768399f54b125 snapshots/1397292720524/jw_schema1-users-jb-1-Data.db
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ md5sum *Data*
3d4351d714500417c74de6811b1eae3b jw_schema1-users.idxAge-jb-1-Data.db
a430a2d65c0a504fe3ab06344654a89a jw_schema1-users.idxLast-jb-1-Data.db
13798e1ffb5ed6a871d768399f54b125 jw_schema1-users-jb-1-Data.db

A snapshot made is not meaningful if you cannot restore back to the node. So from this point on ward, we will take a look on how to restore the snapshot back into the node.

Surprisingly, to restore the command, you would expect for example, nodetool restore backup, but it is not. Rather, there are a few ways to restore the given snapshot sstables.

  1. You can use sstableloader,

  2. copy the sstables into <data_file_directories>/data/jw_schema1/users/ and refresh by calling loadNewSSTables via jconsole or using nodetool refresh

  3. use a node restart method.


It sounds like a lot of works to use either of the first two methods, I'm gonna just try on the last method way of restoring the snapshot sstables.

In order to simulate our backup will be successful, we are going to do a few simulation (disk failure, accidentally delete) here.

  1. copy the snapshot backup somewhere else.

  2. shutdown cassandra and delete cassandra directory.


Okay, let's continue the setup simulation environment
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ cp -r snapshots/ ~/cassandra/
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ sudo /etc/init.d/cassandra stop
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ cd /var/lib/cassandra/commitlog/
jason@localhost:/var/lib/cassandra/commitlog$ ls
total 2.6M
-rw-r--r-- 1 cassandra cassandra 32M Apr 11 18:52 CommitLog-3-1397213531634.log
-rw-r--r-- 1 cassandra cassandra 32M Apr 12 18:38 CommitLog-3-1397213531633.log
jason@localhost:/var/lib/cassandra/commitlog$ sudo rm -rf *
jason@localhost:/var/lib/cassandra/commitlog$ cd ../data/jw_schema1/users/
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ sudo rm -rf *
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

So we have copied the snapshot to a cassandra directory under home directory and also stop cassandra, remove all commitlog and table users in keyspace jw_schema1. Note that in this case, the schema for table users is still exists as the schema is stored in the system keyspace.

And now we will copy the snapshot from home directory back into cassandra.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ sudo cp -r ~/cassandra/snapshots/1397292720524/jw_schema1-users* .
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls
total 96K
-rw-r--r-- 1 root root 76 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 1 root root 43 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-CompressionInfo.db
-rw-r--r-- 1 root root 16 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 1 root root 54 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 1 root root 4.3K Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 1 root root 68 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 1 root root 79 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 1 root root 43 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-CompressionInfo.db
-rw-r--r-- 1 root root 87 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 1 root root 16 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 1 root root 58 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 1 root root 4.3K Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 1 root root 79 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 1 root root 75 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 1 root root 43 Apr 12 18:50 jw_schema1-users-jb-1-CompressionInfo.db
-rw-r--r-- 1 root root 206 Apr 12 18:50 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 1 root root 4.3K Apr 12 18:50 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 1 root root 45 Apr 12 18:50 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 1 root root 16 Apr 12 18:50 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 1 root root 79 Apr 12 18:50 jw_schema1-users-jb-1-TOC.txt
-rw-r--r-- 1 root root 59 Apr 12 18:50 jw_schema1-users-jb-1-Summary.db

So far it looks good, now if you tail the cassandra system.log and start cassandra, notice that the sstables are being read. If within these down time, data supposed to be own by this node is missed, you should by now run nodetool repair to make sure data is sync.
 INFO [main] 2014-04-12 18:52:32,555 ColumnFamilyStore.java (line 254) Initializing jw_schema1.users
INFO [SSTableBatchOpen:1] 2014-04-12 18:52:32,568 SSTableReader.java (line 223) Opening /var/lib/cassandra/data/jw_schema1/users/jw_schema1-users-jb-1 (206 bytes)
INFO [main] 2014-04-12 18:52:32,701 ColumnFamilyStore.java (line 254) Initializing jw_schema1.users.idxLast
INFO [SSTableBatchOpen:1] 2014-04-12 18:52:32,719 SSTableReader.java (line 223) Opening /var/lib/cassandra/data/jw_schema1/users/jw_schema1-users.idxLast-jb-1 (87 bytes)
INFO [main] 2014-04-12 18:52:32,802 ColumnFamilyStore.java (line 254) Initializing jw_schema1.users.idxAge
INFO [SSTableBatchOpen:1] 2014-04-12 18:52:32,810 SSTableReader.java (line 223) Opening /var/lib/cassandra/data/jw_schema1/users/jw_schema1-users.idxAge-jb-1 (76 bytes)

jason@localhost:~/$ nodetool -h localhost repair jw_schema1 users
[2014-04-12 18:59:57,477] Starting repair command #1, repairing 1280 ranges for keyspace jw_schema1
..
[2014-04-12 19:00:50,800] Repair command #1 finished

Now we will check our data if it is still there.
jason@localhost:~/$ cqlsh 192.168.0.2 9160 -k jw_schema1
Connected to just4fun at 192.168.0.2:9160.
[cqlsh 4.1.1 | Cassandra 2.0.6 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.
cqlsh:jw_schema1> select * from users;

user_id | age | first | last | middle
---------+-----+-------+----------+--------
3 | 34 | john | smith | a
2 | 35 | olee | smith | b
1 | 33 | dan | bar | c
(3 rows)

cqlsh:jw_schema1>

All good. :)

In my humble opinion, because cassandra is built with durable and fault tolerant in mind, snapshot is rather not actually needed. Sure, it is fair to argue if someone deleted the data accidentally, but if you can prevent that by blocking from front end, you can actually save a lot of cost in term of cluster backup and restore maintenance cost. If you want to really ensure the data is save, spin up another cluster in another data centre, then the data is guaranteed safe from disaster. But hey, no harm learning a new tools in case you might need it later down the road.

 

Investigate into cassandra 1.0.8 compaction

So what happened what you trigger compact via nodetool? In a nutshell, it goes into a series of low levels java calls.

The execution started on NodeCmd.java, NodeProbe.java, StorageServiceMBean.java, StorageService.java, ColumnFamilyStore.java, CompactionManager.java, AbstractCompactionTask.java and CompactionTask.java

Once object NodeProbe is establish, method forceTableCompaction (...) is called. Within NodeProbe, there is another called StorageServiceMBean which is the JMX bean interface implemented by class StorageService.

what forceTableCompaction(...) does is that, it iterate over the column families and start major compaction. Code snippet below:
public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
cfStore.forceMajorCompaction();
}
}

So it is pretty clear that, the execution goes by getting a valid column families and start to call its method forceMajorCompaction(). What actually happened is that, within method forceMajorCompaction(), this object (ColumnFamilyStore) is passed to CompactionManager singleton to perform an operation known as maximal.

Within CompactionManager class, the object cfStore is perform concurrently. It does by submit the cfStore object to a concurrent codes. To explain better, let's read general compaction framework below:
public Future<Object> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
// acquire the write lock long enough to schedule all sstables
compactionLock.writeLock().lock();
try
{
if (!cfStore.isValid())
return this;
AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
return this;
try
{
// downgrade the lock acquisition
compactionLock.readLock().lock();
compactionLock.writeLock().unlock();
try
{
return task.execute(executor);
}
finally
{
compactionLock.readLock().unlock();
}
}
finally
{
task.unmarkSSTables();
}
}
}
finally
{
// we probably already downgraded
if (compactionLock.writeLock().isHeldByCurrentThread())
compactionLock.writeLock().unlock();
}
return this;
}
};
return executor.submit(callable);
}

To summarize :

  • compaction write lock is made.

  • cfStore object is check again if it still valid.

  • the compaction strategy is retrieved from the cfStore object.

  • mark SSTables for compaction.

  • execute on the CompactionExecutor.


Currently there are two types of compaction strategy in this version; SizeTieredCompactionStrategy and LeveledCompactionStrategy and this discussion continue based on SizeTieredCompactionStrategy.

The real compaction work is done here.
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null;

Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isCompactionInteresting(toCompact))
return 0;

if (compactionFileLocation == null)
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
if (partialCompactionsAcceptable())
{
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
if (compactionFileLocation == null)
{
while (compactionFileLocation == null && toCompact.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
// Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all
// the sstables at the end.
toCompact.remove(cfs.getMaxSizeFile(toCompact));
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}

if (compactionFileLocation == null)
{
logger.warn("insufficient space to compact even the two smallest files, aborting");
return 0;
}
}

if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);

// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);

CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
logger.info("Compacting {}", toCompact);

long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;

long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);

AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();

Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();

if (collector != null)
collector.beginCompaction(ci);
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(toCompact);
return 0;
}

SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;

long position = writer.append(row);
totalkeysWritten++;

if (DatabaseDescriptor.getPreheatKeyCache())
{
for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
}
}
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, Long>();
}
}
}
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
}

cfs.replaceCompactedSSTables(toCompact, sstables);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
SSTableReader key = ssTableReaderMapEntry.getKey();
for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
key.cacheKey(entry.getKey(), entry.getValue());
}

long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
long endsize = SSTable.getTotalBytes(sstables);
double ratio = (double)endsize / (double)startsize;

StringBuilder builder = new StringBuilder();
builder.append("[");
for (SSTableReader reader : sstables)
builder.append(reader.getFilename()).append(",");
builder.append("]");

double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.",
builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
return toCompact.size();
}

That's a lot of works done in this method. :-) I summarized some important points below:

  • checking if enough sstables are present to compact.

  • check if the disk size is suffcient for this compaction task.

  • snapshot before compaction happen.

  • check sstable to be compact is belong to the same column family.

  • CompactionExecutorStatsCollector begin compaction with the AbstractCompactionIterable.

  •  create a compaction writer.

  •  replace a new compacted sstable with the old sstables.


I hope you enjoy this writing.