Showing posts with label cqlsh4.1.1. Show all posts
Showing posts with label cqlsh4.1.1. Show all posts

Friday, May 16, 2014

Learn and experiment with cassandra trigger

In cassandra 2.0, an experimental trigger was introduced and this seem exciting to bring cassandra into a whole new level. Today, by using cassandra 2.0.7 , we are going to learn cassandra trigger. But first, let's understand what conventional database trigger is.

Excerpt from wikipedia,

A database trigger is procedural code that is automatically executed in response to certain events on a particular table or view in a database. The trigger is mostly used for maintaining the integrity of the information on the database. For example, when a new record (representing a new worker) is added to the employees table, new records should also be created in the tables of the taxes, vacations and salaries.

So let's create a table in cassandra and then create a trigger for the table. We will do these execution via cqlsh and the example we are going to follow available in this link. Below are the steps I have taken from studying into the example trigger code.

1. build cassandra jar files in cassandra base directory.
2. build trigger-example.jar from trigger example directory.
3. upload trigger-example.jar to cassandra node directory in /etc/cassandra/triggers
4. copy InvertedIndex.properties to cassandra node directory in /etc/cassandra/
5. make cassandra aware of this jar and properties file addition via nodetool reloadtriggers
nodetool -h localhost reloadtriggers
5. repeat step 3 and 4 for all the nodes in the cluster.
6. create column family invertedindex via cqlsh.
7. create column family standard1 via cqlsh.
8. create trigger via cqlsh CREATE TRIGGER test1 ON "Keyspace1"."Standard1" USING 'org.apache.cassandra.triggers.InvertedIndex';
note that you can also drop trigger via command drop trigger test1 on "Keyspace1"."Standard1"

So that exciting part comes, when I tried to insert, the response keep on complaining key may not be empty, it is strange that we does specify the user_id as our key but it keep on giving error. So what went wrong?
cqlsh:keyspace1> insert into standard1 (user_id, age) values (124, 11);
Bad Request: Key may not be empty

TRACE [Thrift:5] 2014-05-12 22:17:02,492 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.UpdateStatement@164b11c @CL.ONE
DEBUG [Thrift:5] 2014-05-12 22:17:02,493 Tracing.java (line 159) request complete
ERROR [Thrift:5] 2014-05-12 22:17:02,493 CustomTThreadPoolServer.java (line 219) Error occurred during processing of message.
java.lang.RuntimeException: Exception while creating trigger on CF with ID: d04577ab-ecc0-3f57-bb01-6febc9d27803
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(TriggerExecutor.java:167)
at org.apache.cassandra.triggers.TriggerExecutor.execute(TriggerExecutor.java:91)
at org.apache.cassandra.service.StorageProxy.mutateWithTriggers(StorageProxy.java:525)
at org.apache.cassandra.cql3.statements.ModificationStatement.executeWithoutCondition(ModificationStatement.java:542)
at org.apache.cassandra.cql3.statements.ModificationStatement.execute(ModificationStatement.java:526)
at org.apache.cassandra.cql3.QueryProcessor.processStatement(QueryProcessor.java:158)
at org.apache.cassandra.cql3.QueryProcessor.process(QueryProcessor.java:175)
at org.apache.cassandra.thrift.CassandraServer.execute_cql3_query(CassandraServer.java:1959)
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4486)
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4470)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:201)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.NullPointerException
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:133)
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:128)
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:123)
at org.apache.cassandra.db.RowMutation.add(RowMutation.java:149)
at org.apache.cassandra.db.RowMutation.add(RowMutation.java:159)
at org.apache.cassandra.triggers.InvertedIndex.augment(InvertedIndex.java:46)
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(TriggerExecutor.java:159)
... 15 more
TRACE [Thrift:5] 2014-05-12 22:17:02,495 ThriftSessionManager.java (line 74) ClientState removed for socket ad

So I decided to go into further, and I got it to works after spending hours. Changes below.

1. change to lower letters for InvertedIndex.properties
$ cat /etc/cassandra/InvertedIndex.properties
keyspace=keyspace1
columnfamily=invertedindex

2. rebuild trigger-example.jar file with different augment method implementation and remember deploy this to every node in the cluster and execute command reloadtriggers using nodetool.
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.ArrayBackedSortedColumns;
import java.util.Collections;

public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
ByteBufferUtil.bytes(999)));
RowMutation rm = new RowMutation("keyspace1", key);
rm.add(extraUpdate);
return Collections.singletonList(rm);
}

3. drop both column family and recreate again, below are the schema.
cqlsh:keyspace1> desc table invertedindex;

CREATE TABLE invertedindex (
k int,
v1 int,
v2 int,
PRIMARY KEY (k)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

cqlsh:keyspace1> desc table test_table;

CREATE TABLE test_table (
k int,
v1 int,
v2 int,
PRIMARY KEY (k)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

and now when insert again into the cf, voila, 999 was auto created and no more exception in the log or cqlsh output!
cqlsh:keyspace1> select * from test_table;

(0 rows)

cqlsh:keyspace1> insert into test_table (k, v1) values (0, 0);
cqlsh:keyspace1> select * from test_table;

k | v1 | v2
---+----+-----
0 | 0 | 999

(1 rows)

Conclusion that we can draw is, since it is experimental, that means in the future, trigger is subject to many changes including API and chances that it could fail is higher ;-). It also need cassandra and java knowledge to build trigger at the mean time. Thus, you should not use this in production but that does not mean you cannot try this feature. In fact, cassandra would like to receive feedback  on the trigger to improve or make cassandra trigger production ready in the future.

That's it for this article, if you like, please go to the donation page to contribute back as funding will keep us continue to write in the future.

Sunday, May 11, 2014

Store video on cassandra and using hector streaming IO

Today, we are going to learn how to stream in and stream out using hector-client.  There are two classes implemented in hector-client which storing binary in chunk and reading binary in chunk. That's pretty neat! The two mentioned classes are

ChunkOutputStream storing binary as blog into cassandra.
ChunkInputStream read binary as blog from cassandra.

Below is a test case coded with the two classes to show how to store and read data using the two mentioned class.
import static org.junit.Assert.*;

import java.io.IOException;
import java.util.Arrays;

import me.prettyprint.cassandra.connection.HConnectionManager;
import me.prettyprint.cassandra.io.ChunkInputStream;
import me.prettyprint.cassandra.io.ChunkOutputStream;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;

import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.KsDef;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class HectorStreamTest {

private Keyspace keyspace;
private ThriftCluster cassandraCluster;
private CassandraHostConfigurator cassandraHostConfigurator;
protected HConnectionManager connectionManager;
public static KeyspaceDefinition KEYSPACE_DEV;
public final static String KEYSPACE = "TestKeyspace";
public final static String BLOB_CF = "Blob";
public final static CfDef BLOB_CF_DEF = new CfDef(KEYSPACE, BLOB_CF);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {
cassandraHostConfigurator = new CassandraHostConfigurator(
"192.168.0.2:9160");
connectionManager = new HConnectionManager("just4fun",
cassandraHostConfigurator);

KEYSPACE_DEV = new ThriftKsDef(new KsDef(KEYSPACE,
"org.apache.cassandra.locator.SimpleStrategy",
Arrays.asList(new CfDef[] { BLOB_CF_DEF })));
((ThriftKsDef) KEYSPACE_DEV).setReplicationFactor(1);
cassandraCluster = new ThriftCluster("just4fun",
cassandraHostConfigurator);

keyspace = HFactory.createKeyspace(KEYSPACE, cassandraCluster);

cassandraCluster.addKeyspace(KEYSPACE_DEV, true);
}

@After
public void tearDown() throws Exception {
cassandraCluster.dropKeyspace(KEYSPACE);
}

@Test
public void testWriteAndReadStream() throws IOException {
byte[] value = "hello world, store and read binary as a chunk of blob in and from cassandra.".getBytes();

// write to cassandra.
ChunkOutputStream<String> out = new ChunkOutputStream<String>(keyspace, BLOB_CF, "row1", StringSerializer.get(), 2);
out.write(value);
out.close();

// read from cassandra.
ChunkInputStream<String> in = new ChunkInputStream<String>(keyspace, BLOB_CF, "row1", StringSerializer.get());
int i = -1;
int written = 0;

while ((i = in.read()) != -1) {
assertSame(value[written++], (byte) i);
byte[] b = {(byte)i};
System.out.print(new String(b));
}

in.close();
}

}

Keyspace TestKeyspace is created and table Blob is use to write and read the blob data. The main point is probably on the chunk size in ChunkOutputStream, it is set to 2 but you can give another even number to store the byte. Remember, each byte is represented by two hexadecimal characters, see cqlsh output below for more information. The test method testWriteAndReadStream() store the data in variable value using ChunkOutputStream.write() and remember to close it so that the data actually flush to cassandra or else it will stay in client code. To read from cassandra, is by specifing the row to ChunkInputStream and calling method read() which it will return the data in chunk. When the test is done, keyspace is removed.
cqlsh:TestKeyspace> select * from "Blob";

key | column1 | value
------------+--------------------+--------
0x726f7731 | 0x0000000000000000 | 0x6800
0x726f7731 | 0x0000000000000001 | 0x6500
0x726f7731 | 0x0000000000000002 | 0x6c00
0x726f7731 | 0x0000000000000003 | 0x6c00
0x726f7731 | 0x0000000000000004 | 0x6f00
0x726f7731 | 0x0000000000000005 | 0x2000
0x726f7731 | 0x0000000000000006 | 0x7700
0x726f7731 | 0x0000000000000007 | 0x6f00
0x726f7731 | 0x0000000000000008 | 0x7200
0x726f7731 | 0x0000000000000009 | 0x6c00
0x726f7731 | 0x000000000000000a | 0x6400
0x726f7731 | 0x000000000000000b | 0x2c00
0x726f7731 | 0x000000000000000c | 0x2000
0x726f7731 | 0x000000000000000d | 0x7300
0x726f7731 | 0x000000000000000e | 0x7400
0x726f7731 | 0x000000000000000f | 0x6f00
0x726f7731 | 0x0000000000000010 | 0x7200
0x726f7731 | 0x0000000000000011 | 0x6500
0x726f7731 | 0x0000000000000012 | 0x2000
0x726f7731 | 0x0000000000000013 | 0x6100
0x726f7731 | 0x0000000000000014 | 0x6e00
0x726f7731 | 0x0000000000000015 | 0x6400
0x726f7731 | 0x0000000000000016 | 0x2000
0x726f7731 | 0x0000000000000017 | 0x7200
0x726f7731 | 0x0000000000000018 | 0x6500
0x726f7731 | 0x0000000000000019 | 0x6100
0x726f7731 | 0x000000000000001a | 0x6400
0x726f7731 | 0x000000000000001b | 0x2000
0x726f7731 | 0x000000000000001c | 0x6200
0x726f7731 | 0x000000000000001d | 0x6900
0x726f7731 | 0x000000000000001e | 0x6e00
0x726f7731 | 0x000000000000001f | 0x6100
0x726f7731 | 0x0000000000000020 | 0x7200
0x726f7731 | 0x0000000000000021 | 0x7900
0x726f7731 | 0x0000000000000022 | 0x2000
0x726f7731 | 0x0000000000000023 | 0x6100
0x726f7731 | 0x0000000000000024 | 0x7300
0x726f7731 | 0x0000000000000025 | 0x2000
0x726f7731 | 0x0000000000000026 | 0x6100
0x726f7731 | 0x0000000000000027 | 0x2000
0x726f7731 | 0x0000000000000028 | 0x6300
0x726f7731 | 0x0000000000000029 | 0x6800
0x726f7731 | 0x000000000000002a | 0x7500
0x726f7731 | 0x000000000000002b | 0x6e00
0x726f7731 | 0x000000000000002c | 0x6b00
0x726f7731 | 0x000000000000002d | 0x2000
0x726f7731 | 0x000000000000002e | 0x6f00
0x726f7731 | 0x000000000000002f | 0x6600
0x726f7731 | 0x0000000000000030 | 0x2000
0x726f7731 | 0x0000000000000031 | 0x6200
0x726f7731 | 0x0000000000000032 | 0x6c00
0x726f7731 | 0x0000000000000033 | 0x6f00
0x726f7731 | 0x0000000000000034 | 0x6200
0x726f7731 | 0x0000000000000035 | 0x2000
0x726f7731 | 0x0000000000000036 | 0x6900
0x726f7731 | 0x0000000000000037 | 0x6e00
0x726f7731 | 0x0000000000000038 | 0x2000
0x726f7731 | 0x0000000000000039 | 0x6100
0x726f7731 | 0x000000000000003a | 0x6e00
0x726f7731 | 0x000000000000003b | 0x6400
0x726f7731 | 0x000000000000003c | 0x2000
0x726f7731 | 0x000000000000003d | 0x6600
0x726f7731 | 0x000000000000003e | 0x7200
0x726f7731 | 0x000000000000003f | 0x6f00
0x726f7731 | 0x0000000000000040 | 0x6d00
0x726f7731 | 0x0000000000000041 | 0x2000
0x726f7731 | 0x0000000000000042 | 0x6300
0x726f7731 | 0x0000000000000043 | 0x6100
0x726f7731 | 0x0000000000000044 | 0x7300
0x726f7731 | 0x0000000000000045 | 0x7300
0x726f7731 | 0x0000000000000046 | 0x6100
0x726f7731 | 0x0000000000000047 | 0x6e00
0x726f7731 | 0x0000000000000048 | 0x6400
0x726f7731 | 0x0000000000000049 | 0x7200
0x726f7731 | 0x000000000000004a | 0x6100
0x726f7731 | 0x000000000000004b | 0x2e00

(76 rows)

cqlsh:TestKeyspace>

As basic classes provided by hector-client is in the package, it's a good to have feature to have if you want to stream in and stream out content like audio or video. I have implemented something similar here. The concept is similar, write binary content to cassandra and reconstruct the binary data into the file again.

That's it, hope you like this.

Saturday, May 10, 2014

Understand cassandra read path by tracing in CQL

In our last article, we explored cassandra 2.0.7 write path and in this article, we will explore cassandra read path. We will again follow the same investigation method we used on write for read too. That is, we will trace the read path by turning on tracing in cqlsh.

Let's start by enabling tracing and consistency to all. Then issue statement select and start to dig into code. Below are the output of the commands executed in cqlsh and output in cassandra system.log
cqlsh:jw_schema1> consistency all;
Consistency level set to ALL.
cqlsh:jw_schema1> tracing on;
Now tracing requests.
cqlsh:jw_schema1> select * from users;

user_id | age | first | last | middle
---------+-----+-----------+-------+--------
4 | 10 | john30003 | smith | junior
3 | 10 | john30003 | smith | junior
5 | 10 | john30003 | smith | junior
2 | 10 | john30003 | smith | junior

(4 rows)

Tracing session: 66a845c0-d5f3-11e3-bd26-a322c40b8b81

activity | timestamp | source | source_elapsed
-------------------------------------------------------------------------------------------------+--------------+---------------+----------------
execute_cql3_query | 22:25:12,732 | <node3_ip> | 0
Message received from /<node3_ip> | 22:25:11,106 | <node2_ip> | 26
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:11,106 | <node2_ip> | 289
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 442
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 581
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 658
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 724
Scanned 4 rows and matched 4 | 22:25:11,106 | <node2_ip> | 760
Enqueuing response to /<node3_ip> | 22:25:11,106 | <node2_ip> | 785
Sending message to /<node3_ip> | 22:25:11,107 | <node2_ip> | 954
Message received from /<node3_ip> | 22:25:12,430 | <node1_ip> | 76
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:12,431 | <node1_ip> | 1054
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1250
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1399
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1537
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1718
Scanned 4 rows and matched 4 | 22:25:12,432 | <node1_ip> | 1777
Enqueuing response to /<node3_ip> | 22:25:12,432 | <node1_ip> | 1935
Sending message to /<node3_ip> | 22:25:12,433 | <node1_ip> | 2212
Parsing select * from users LIMIT 10000; | 22:25:12,732 | <node3_ip> | 148
Preparing statement | 22:25:12,732 | <node3_ip> | 259
Determining replicas to query | 22:25:12,733 | <node3_ip> | 941
Enqueuing request to /<node2_ip> | 22:25:12,738 | <node3_ip> | 5556
Enqueuing request to /<node1_ip> | 22:25:12,738 | <node3_ip> | 5645
Enqueuing request to <node3_hostname>/<node3_ip> | 22:25:12,738 | <node3_ip> | 5688
Sending message to /<node2_ip> | 22:25:12,738 | <node3_ip> | 5811
Sending message to /192.168.0.2 | 22:25:12,738 | <node3_ip> | 5817
Sending message to /<node1_ip> | 22:25:12,738 | <node3_ip> | 6133
Message received from /<node3_ip> | 22:25:12,739 | <node3_ip> | 6558
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:12,740 | <node3_ip> | 7294
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 7506
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 7698
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 8222
Read 1 live and 0 tombstoned cells | 22:25:12,741 | <node3_ip> | 8570
Scanned 4 rows and matched 4 | 22:25:12,741 | <node3_ip> | 8634
Enqueuing response to /<node3_ip> | 22:25:12,741 | <node3_ip> | 8689
Sending message to /192.168.0.2 | 22:25:12,741 | <node3_ip> | 8821
Message received from /<node3_ip> | 22:25:12,742 | <node3_ip> | null
Processing response from /<node3_ip> | 22:25:12,742 | <node3_ip> | null
Message received from /<node1_ip> | 22:25:13,029 | <node3_ip> | null
Processing response from /<node1_ip> | 22:25:13,029 | <node3_ip> | null
Message received from /<node2_ip> | 22:25:13,061 | <node3_ip> | null
Processing response from /<node2_ip> | 22:25:13,061 | <node3_ip> | null
Read 5 live and 0 tombstoned cells | 22:25:13,086 | <node3_ip> | 353631
Read 5 live and 0 tombstoned cells | 22:25:13,087 | <node3_ip> | 355232
Read 5 live and 0 tombstoned cells | 22:25:13,093 | <node3_ip> | 360675
Read 5 live and 0 tombstoned cells | 22:25:13,093 | <node3_ip> | 360908
Request complete | 22:25:13,093 | <node3_ip> | 361266

cqlsh:jw_schema1>

TRACE [Thrift:186] 2014-05-07 22:25:12,733 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@43e049 @CL.ALL
DEBUG [Thrift:186] 2014-05-07 22:25:13,604 CassandraServer.java (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-07 22:25:13,605 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@17100f1 @CL.ONE
DEBUG [Thrift:186] 2014-05-07 22:25:13,911 Tracing.java (line 159) request complete
DEBUG [Thrift:186] 2014-05-07 22:25:13,915 CassandraServer.java (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-07 22:25:13,916 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@d34f6 @CL.ONE
DEBUG [Thrift:186] 2014-05-07 22:25:14,227 Tracing.java (line 159) request complete

As write entry path is execute_cql3_query, so is read path. If you trace the code down, it will be too much to even start the discussion. I summarize the points below in tandem with the output of cqlsh tracing and system.log where applicable. Thus it may not be complete but I will give you the link as narration goes so that you can study yourself in detail.

It started at CassandraServer.execute_cql3_query(...)  as indicated in cqlsh tracing output. So basically the work done can be summarize by this line:
cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();

If you step into the code above, QueryProcessor.process(...)  which implement the interface QueryHandler  which get a valid CQLStatement. The execution continue by calling method QueryProcessor.processStatement(...). Notice that the logger in this method is shown in cassandra system.log (of cause you need to enable tracing for this class in log4j.properties in order for this line to log successfully). So access checking and validation are perform here. When checking and validation were done, then CQLStatement.execute(...) is executed.

Because we are executing select statement, the correspond class that implement interface CQLStatement is SelectStatement.  Extract from SelectStatement.execute(...)
public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
ConsistencyLevel cl = options.getConsistency();
List<ByteBuffer> variables = options.getValues();
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");

cl.validateForRead(keyspace());

int limit = getLimit(variables);
long now = System.currentTimeMillis();
Pageable command;
if (isKeyRange || usesSecondaryIndexing)
{
command = getRangeCommand(variables, limit, now);
}
else
{
List<ReadCommand> commands = getSliceCommands(variables, limit, now);
command = commands == null ? null : new Pageable.ReadCommands(commands);
}

int pageSize = options.getPageSize();
// A count query will never be paged for the user, but we always page it internally to avoid OOM.
// If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
// Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
if (parameters.isCount && pageSize <= 0 && MessagingService.instance().allNodesAtLeast20)
pageSize = DEFAULT_COUNT_PAGE_SIZE;

if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
{
return execute(command, cl, variables, limit, now);
}
else
{
QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState());
if (parameters.isCount)
return pageCountQuery(pager, variables, pageSize, now);

// We can't properly do post-query ordering if we page (see #6722)
if (needsPostQueryOrdering())
throw new InvalidRequestException("Cannot page queries with both ORDER BY and a IN restriction on the partition key; you must either remove the "
+ "ORDER BY or the IN and sort client side, or disable paging for this query");

List<Row> page = pager.fetchPage(pageSize);
ResultMessage.Rows msg = processResults(page, variables, limit, now);
if (!pager.isExhausted())
msg.result.metadata.setHasMorePages(pager.state());
return msg;
}
}

The execution continue to get the Pageable Command . Execution continue to private method execute(...). Then method getRangeSlice(...) is called. This is the actual work done to retrieve all the rows. This method implementation does a lot of works and I would recommend you click on the link and study the code yourself to get a better picture.

When the control is returned, the rows are sent for further processing using method processResults(...)  which eventually return the result back to the cassandra client.

As you may have notice, the upper layer execution is similar as to write path execution, until control passed to CQLStatement. That's it for this article, I hope you like it.

Monday, April 21, 2014

Enable or disable sstable compression?

In cassandra 2.0.6, there are a few compression for sstables, the default is LZ4Compressor. There are others such as DeflateCompressor, SnappyCompressor or
do not compress the sstables at all.

You can read more about compression at official documentation as found it here.

With this blog, I will create two scenarios where first scenario is with enable compression and another scenario is without compression. This is the only different for both scenarios.

So I have create 50 thousands insert statement with cql and then insert using by feeding to cqlsh. So first , the schema below with LZ4Compressor compression and leave value for key sstable_compression empty for no compression.
CREATE TABLE users (
user_id text,
age int,
first text,
last text,
middle text,
PRIMARY KEY (user_id)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='storing user data' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

CREATE INDEX idxAge ON users (age);

CREATE INDEX idxLast ON users (last);

jason@localhost:~$ wc -l data.cql
50000 data.cql
jason@localhost:~$ cqlsh 192.168.0.2 9160 -k jw_schema1 -f data.cql
jason@localhost:~$

so looks good, that we have total rows of 50 thousands.
cqlsh:jw_schema1> select count(*) from users limit 100000;

count
-------
50000

(1 rows)

cqlsh:jw_schema1>

Ran nodetool repair, flush, cleanup and then compact. With compression enable, the sstable count only 1 and the total filesize in this directory is about 4.5MB.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls -l
total 4576
-rw-r--r-- 1 cassandra cassandra 179 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-CompressionInfo.db
-rw-r--r-- 1 cassandra cassandra 599421 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 136 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 1800 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 68 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:02 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 179 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-CompressionInfo.db
-rw-r--r-- 1 cassandra cassandra 598579 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 16 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 680 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:02 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 971 Apr 15 21:02 jw_schema1-users-jb-1-CompressionInfo.db
-rw-r--r-- 1 cassandra cassandra 2387391 Apr 15 21:02 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 62512 Apr 15 21:02 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 938894 Apr 15 21:02 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4391 Apr 15 21:02 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 6615 Apr 15 21:02 jw_schema1-users-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:02 jw_schema1-users-jb-1-TOC.txt
drwxr-xr-x 2 cassandra cassandra 4096 Apr 15 20:57 snapshots
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

Right now without compression, the total file size is about 11MB. Noticed that, the size is almost double and the sstable count is two.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls -l
total 10860
-rw-r--r-- 1 cassandra cassandra 48 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-CRC.db
-rw-r--r-- 1 cassandra cassandra 687656 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 78 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 136 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 1800 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 68 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 32 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-CRC.db
-rw-r--r-- 1 cassandra cassandra 455238 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Data.db
-rw-r--r-- 1 cassandra cassandra 78 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 136 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Filter.db
-rw-r--r-- 1 cassandra cassandra 1800 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Index.db
-rw-r--r-- 1 cassandra cassandra 4393 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Statistics.db
-rw-r--r-- 1 cassandra cassandra 68 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users.idxAge-jb-2-TOC.txt
-rw-r--r-- 1 cassandra cassandra 48 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-CRC.db
-rw-r--r-- 1 cassandra cassandra 685677 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 16 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 425 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4392 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 32 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-CRC.db
-rw-r--r-- 1 cassandra cassandra 453259 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Data.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 16 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Filter.db
-rw-r--r-- 1 cassandra cassandra 287 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Index.db
-rw-r--r-- 1 cassandra cassandra 4393 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Statistics.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users.idxLast-jb-2-TOC.txt
-rw-r--r-- 1 cassandra cassandra 288 Apr 15 21:23 jw_schema1-users-jb-1-CRC.db
-rw-r--r-- 1 cassandra cassandra 4612770 Apr 15 21:23 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:23 jw_schema1-users-jb-1-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 37880 Apr 15 21:23 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 1 cassandra cassandra 564480 Apr 15 21:23 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 1 cassandra cassandra 4391 Apr 15 21:23 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 1 cassandra cassandra 3984 Apr 15 21:23 jw_schema1-users-jb-1-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:23 jw_schema1-users-jb-1-TOC.txt
-rw-r--r-- 1 cassandra cassandra 192 Apr 15 21:24 jw_schema1-users-jb-2-CRC.db
-rw-r--r-- 1 cassandra cassandra 3015018 Apr 15 21:24 jw_schema1-users-jb-2-Data.db
-rw-r--r-- 1 cassandra cassandra 71 Apr 15 21:24 jw_schema1-users-jb-2-Digest.sha1
-rw-r--r-- 1 cassandra cassandra 24648 Apr 15 21:24 jw_schema1-users-jb-2-Filter.db
-rw-r--r-- 1 cassandra cassandra 374414 Apr 15 21:24 jw_schema1-users-jb-2-Index.db
-rw-r--r-- 1 cassandra cassandra 4391 Apr 15 21:24 jw_schema1-users-jb-2-Statistics.db
-rw-r--r-- 1 cassandra cassandra 2672 Apr 15 21:24 jw_schema1-users-jb-2-Summary.db
-rw-r--r-- 1 cassandra cassandra 79 Apr 15 21:24 jw_schema1-users-jb-2-TOC.txt

With current hardware setup, which is loaded, with sstable compression enable, at times, the request get rpc timeout but at times, the result is returned. However without compression on sstable, all the requests executed get timeout. Below are the query perform via cqlsh.
cqlsh:jw_schema1> select * from users where age > 95 and last = 'smith' allow filtering;
Request did not complete within rpc_timeout.

Apparently enable compression does improve reading speed and saving disk size.

Sunday, April 13, 2014

Research into cassandra nodetool cfhistograms and interpret statistics

What is nodetool cfhistogram?

According to the official documentation definition: The nodetool cfhistograms command provides statistics about a table, including read/write latency, row size, column count, and number of SSTables.

If you noticed the picture output below, it is entirely different than the cfhistogram output in cassandra 2.0.6 . Apparently output of cfhistograms is simplified and improved! You can find more information about this improvement here. To get the existing way of output, give −−compact to the nodetool as a parameter.



Okay, let's start by issue command nodetool cfhistograms to our cluster.
jason@localhost:~$ nodetool -h localhost cfhistograms jw_schema1 users
jw_schema1/users histograms

SSTables per Read
1 sstables: 997

Write Latency (microseconds)
No Data

Read Latency (microseconds)
103 us: 1
124 us: 15
149 us: 28
179 us: 131
215 us: 306
258 us: 373
310 us: 66
372 us: 17
446 us: 6
535 us: 21
642 us: 10
770 us: 2
924 us: 1
1109 us: 3
1331 us: 1
1597 us: 1
1916 us: 3
2299 us: 0
2759 us: 2
3311 us: 1
3973 us: 0
4768 us: 0
5722 us: 1
6866 us: 0
8239 us: 1
9887 us: 4
11864 us: 1
14237 us: 1
17084 us: 1

Partition Size (bytes)
149 bytes: 3

Cell Count per Partition
5 cells: 3

The statistics is a bit difficult to understand if you do not know what does it mean. Let's begin by studying into the cfhistograms codes.
private void printCfHistograms(String keySpace, String columnFamily, PrintStream output, boolean compactFormat)
{
ColumnFamilyStoreMBean store = this.probe.getCfsProxy(keySpace, columnFamily);

// default is 90 offsets
long[] offsets = new EstimatedHistogram().getBucketOffsets();

long[] rrlh = store.getRecentReadLatencyHistogramMicros();
long[] rwlh = store.getRecentWriteLatencyHistogramMicros();
long[] sprh = store.getRecentSSTablesPerReadHistogram();
long[] ersh = store.getEstimatedRowSizeHistogram();
long[] ecch = store.getEstimatedColumnCountHistogram();

output.println(String.format("%s/%s histograms", keySpace, columnFamily));
output.println("");

if (compactFormat)
{
output.println(String.format("%-10s%10s%18s%18s%18s%18s",
"Offset", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
output.println(String.format("%-10s%10s%18s%18s%18s%18s",
"", "", "(micros)", "(micros)", "(bytes)", ""));

for (int i = 0; i < offsets.length; i++)
{
output.println(String.format("%-10d%10s%18s%18s%18s%18s",
offsets[i],
(i < sprh.length ? sprh[i] : "0"),
(i < rwlh.length ? rwlh[i] : "0"),
(i < rrlh.length ? rrlh[i] : "0"),
(i < ersh.length ? ersh[i] : "0"),
(i < ecch.length ? ecch[i] : "0")));
}
}
else
{
output.println("SSTables per Read");
printHistogram(sprh, offsets, "sstables", output);

output.println("Write Latency (microseconds)");
printHistogram(rwlh, offsets, "us", output);

output.println("Read Latency (microseconds)");
printHistogram(rrlh, offsets, "us", output);

output.println("Partition Size (bytes)");
printHistogram(ersh, offsets, "bytes", output);

output.println("Cell Count per Partition");
printHistogram(ecch, offsets, "cells", output);
}
}

Essentially a proxy ColumnFamilyStoreMBean is made through jmx ($ jconsole service:jmx:rmi:///jndi/rmi://192.168.0.2:7199/jmxrmi also see picture below) based on the previous keyspace and column family specified in the nodetool parameter. The default bucket offset will always be 90. Thus if you carefully analyzed the row output of the compact statistics, you will noticed exactly 90 rows each time nodetool cfhistogram command is triggered.



You would ask, why would 90 bucket offsets? Well according to the codes documentation:
The series of values to which the counts in `buckets` correspond:
1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 14, 17, 20, etc.
Thus, a `buckets` of [0, 0, 1, 10] would mean we had seen one value of 3 and 10 values of 4.

The series starts at 1 and grows by 1.2 each time (rounding and removing duplicates). It goes from 1
to around 36M by default (creating 90+1 buckets), which will give us timing resolution from microseconds to
36 seconds, with less precision as the numbers get larger.

Each bucket represents values from (previous bucket offset, current offset].

Depending if parameter compact is specified, the output will be different. There are six metrics exposed. We will take a closer look.

  • offset | the bucket offset


Bucket offset from 149 (exclusive) to 179 (inclusive). Essentially this bucket offset contain latency from 149 microseconds until 179 microseconds.




  • SSTables | recent SSTables per read


With each read, total of sstables accessed accountable for. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.


This metric will increase if there is any call to CollationController.java or CacheService.java




  • Write Latency (micros) | recent write latency histogram in microseconds.


An array representing the latency histogram for write in microseconds. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.


This metric will increase if there is any call to ColumnFamilyStore.java, StorageProxy.java or WeightedQueue.java .




  • Read Latency (micros) | recent read latency histogram in Microseconds.


An array representing the latency histogram for read in microseconds. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.




  • Partition Size (bytes ) | estimated row size histogram


As estimation of row size in bytes. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will NOT reset.


The metric is collected by iterating over the sstables, and get the estimated row size in bytes.




  • Cell Count | estimated column count histogram


Estimated number of columns. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will NOT reset.


The metric is collected by iterating over the sstables, and get the estimated column count.


So with these interpretation from the codes, let's take another compact form cfhistogram to interpret the metrics. First, we will make start by make some statistics:
cqlsh:jw_schema1> select * from users where age > 5 and age < 50 and last = 'smith' allow filtering;

jason@localhost:~$ nodetool -h localhost cfhistograms jw_schema1 users -c
jw_schema1/users histograms

Offset SSTables Write Latency Read Latency Partition Size Cell Count
(micros) (micros) (bytes)
1 997 0 0 0 0
2 0 0 0 0 0
3 0 0 0 0 0
4 0 0 0 0 0
5 0 0 0 0 1000
6 0 0 0 0 0
7 0 0 0 0 0
8 0 0 0 0 0
10 0 0 0 0 0
12 0 0 0 0 0
14 0 0 0 0 0
17 0 0 0 0 0
20 0 0 0 0 0
24 0 0 0 0 0
29 0 0 0 0 0
35 0 0 0 0 0
42 0 0 0 0 0
50 0 0 0 0 0
60 0 0 0 0 0
72 0 0 0 0 0
86 0 0 0 0 0
103 0 0 0 0 0
124 0 0 0 0 0
149 0 0 0 999 0
179 0 0 0 1 0
215 0 0 0 0 0
258 0 0 0 0 0
310 0 0 0 0 0
372 0 0 0 0 0
446 0 0 0 0 0
535 0 0 0 0 0
642 0 0 0 0 0
770 0 0 0 0 0
924 0 0 0 0 0
1109 0 0 0 0 0
1331 0 0 51 0 0
1597 0 0 491 0 0
1916 0 0 95 0 0
2299 0 0 53 0 0
2759 0 0 84 0 0
3311 0 0 95 0 0
3973 0 0 41 0 0
4768 0 0 32 0 0
5722 0 0 25 0 0
6866 0 0 9 0 0
8239 0 0 7 0 0
9887 0 0 6 0 0
11864 0 0 4 0 0
14237 0 0 0 0 0
17084 0 0 2 0 0
20501 0 0 0 0 0
24601 0 0 0 0 0
29521 0 0 0 0 0
35425 0 0 0 0 0
42510 0 0 1 0 0
51012 0 0 0 0 0
61214 0 0 0 0 0
73457 0 0 0 0 0
88148 0 0 0 0 0
105778 0 0 1 0 0
126934 0 0 0 0 0
152321 0 0 0 0 0
182785 0 0 0 0 0
219342 0 0 0 0 0
263210 0 0 0 0 0
315852 0 0 0 0 0
379022 0 0 0 0 0
454826 0 0 0 0 0
545791 0 0 0 0 0
654949 0 0 0 0 0
785939 0 0 0 0 0
943127 0 0 0 0 0
1131752 0 0 0 0 0
1358102 0 0 0 0 0
1629722 0 0 0 0 0
1955666 0 0 0 0 0
2346799 0 0 0 0 0
2816159 0 0 0 0 0
3379391 0 0 0 0 0
4055269 0 0 0 0 0
4866323 0 0 0 0 0
5839588 0 0 0 0 0
7007506 0 0 0 0 0
8409007 0 0 0 0 0
10090808 0 0 0 0 0
12108970 0 0 0 0 0
14530764 0 0 0 0 0
17436917 0 0 0 0 0
20924300 0 0 0 0 0
25109160 0 0 0 0 0


  • There are 51 read requests spend time from 1109 microsecond to 1331 microsecond.

  • 997 sstables were read and spent time 1 microsecond.

  • Because this is a read operation, (cql select statement), there is no write latency involved.

  • The mean size for 999 partition is 149 bytes and another one is 179 bytes.

  • There are 1000 partition with 5 cells.


These metric is good for monitoring if you can poll periodically and plot them into graphs. Note that, those methods covered above, many had been deprecated in this cassandra version and probably in the coming cassandra, it will be removed and that they will have better way of depicting the metric. If you started on older cassandra version for example, pre-cassandra 1.1, the cell is correspond to column whilst partition is correspond to row.

Thank you.

Saturday, April 12, 2014

Learn and play with cassandra 2.0.6 snapshot and restore

Snapshot of cassandra appearing as early as cassandra version 0.4.0 beta. Today, we are going to learn on cassandra snapshot. Note that if you run snapshot on a node in a cluster, it only snapshot on that node. If you want to snapshot for all nodes in a cluster,

it is much more efficient to use a parallel ssh such as clusterssh or pssh.

Fundamentally, when snapshot is executed, it copy the sstables into a snapshot directory. So be notice that if you have a huge node load, it require two times the disk space of that server and it may spike the I/O activity on that node too if large amount of sstables is being snapshot.

Let's get down to the work.

First, ensure at least the table (column family) has data.
cqlsh:jw_schema1> select * from users;

user_id | age | first | last | middle
---------+-----+-------+----------+--------
3 | 34 | john | smith | a
2 | 35 | olee | smith | b
1 | 33 | dan | bar | c

(3 rows)

Then take a snapshot, for instance, I only take a snapshot of this keyspace, jw_schema1 and table users. What that does is that, cassandra will flush the data to sstable before snapshot is taken. For option such as giving a meaningful snapshot a name, check out command nodetool help.
jason@localhost:~$ nodetool -h localhost snapshot jw_schema1 -cf users
Requested creating snapshot for: jw_schema1 and table: users
Snapshot directory: 1397292720524

The snapshot made will be stored at <data_file_directories> that you set in cassandra.yaml file. So for instance,
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls -l snapshots/1397292720524/
total 96K
-rw-r--r-- 2 cassandra cassandra 16 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 2 cassandra cassandra 54 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 2 cassandra cassandra 76 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 2 cassandra cassandra 43 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-CompressionInfo.db
-rw-r--r-- 2 cassandra cassandra 4.3K Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 2 cassandra cassandra 79 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 2 cassandra cassandra 68 Apr 12 16:52 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 2 cassandra cassandra 16 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 2 cassandra cassandra 58 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 2 cassandra cassandra 87 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 2 cassandra cassandra 43 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-CompressionInfo.db
-rw-r--r-- 2 cassandra cassandra 4.3K Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 2 cassandra cassandra 79 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 2 cassandra cassandra 75 Apr 12 16:52 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 2 cassandra cassandra 16 Apr 12 16:52 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 2 cassandra cassandra 45 Apr 12 16:52 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 2 cassandra cassandra 206 Apr 12 16:52 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 2 cassandra cassandra 43 Apr 12 16:52 jw_schema1-users-jb-1-CompressionInfo.db
-rw-r--r-- 2 cassandra cassandra 4.3K Apr 12 16:52 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 2 cassandra cassandra 79 Apr 12 16:52 jw_schema1-users-jb-1-TOC.txt
-rw-r--r-- 2 cassandra cassandra 59 Apr 12 16:52 jw_schema1-users-jb-1-Summary.db

If you md5sum on the data files between snapshot and the live data, they are identically match.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ md5sum snapshots/1397292720524/*Data*
3d4351d714500417c74de6811b1eae3b snapshots/1397292720524/jw_schema1-users.idxAge-jb-1-Data.db
a430a2d65c0a504fe3ab06344654a89a snapshots/1397292720524/jw_schema1-users.idxLast-jb-1-Data.db
13798e1ffb5ed6a871d768399f54b125 snapshots/1397292720524/jw_schema1-users-jb-1-Data.db
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ md5sum *Data*
3d4351d714500417c74de6811b1eae3b jw_schema1-users.idxAge-jb-1-Data.db
a430a2d65c0a504fe3ab06344654a89a jw_schema1-users.idxLast-jb-1-Data.db
13798e1ffb5ed6a871d768399f54b125 jw_schema1-users-jb-1-Data.db

A snapshot made is not meaningful if you cannot restore back to the node. So from this point on ward, we will take a look on how to restore the snapshot back into the node.

Surprisingly, to restore the command, you would expect for example, nodetool restore backup, but it is not. Rather, there are a few ways to restore the given snapshot sstables.

  1. You can use sstableloader,

  2. copy the sstables into <data_file_directories>/data/jw_schema1/users/ and refresh by calling loadNewSSTables via jconsole or using nodetool refresh

  3. use a node restart method.


It sounds like a lot of works to use either of the first two methods, I'm gonna just try on the last method way of restoring the snapshot sstables.

In order to simulate our backup will be successful, we are going to do a few simulation (disk failure, accidentally delete) here.

  1. copy the snapshot backup somewhere else.

  2. shutdown cassandra and delete cassandra directory.


Okay, let's continue the setup simulation environment
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ cp -r snapshots/ ~/cassandra/
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ sudo /etc/init.d/cassandra stop
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ cd /var/lib/cassandra/commitlog/
jason@localhost:/var/lib/cassandra/commitlog$ ls
total 2.6M
-rw-r--r-- 1 cassandra cassandra 32M Apr 11 18:52 CommitLog-3-1397213531634.log
-rw-r--r-- 1 cassandra cassandra 32M Apr 12 18:38 CommitLog-3-1397213531633.log
jason@localhost:/var/lib/cassandra/commitlog$ sudo rm -rf *
jason@localhost:/var/lib/cassandra/commitlog$ cd ../data/jw_schema1/users/
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ sudo rm -rf *
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$

So we have copied the snapshot to a cassandra directory under home directory and also stop cassandra, remove all commitlog and table users in keyspace jw_schema1. Note that in this case, the schema for table users is still exists as the schema is stored in the system keyspace.

And now we will copy the snapshot from home directory back into cassandra.
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ sudo cp -r ~/cassandra/snapshots/1397292720524/jw_schema1-users* .
jason@localhost:/var/lib/cassandra/data/jw_schema1/users$ ls
total 96K
-rw-r--r-- 1 root root 76 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Data.db
-rw-r--r-- 1 root root 43 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-CompressionInfo.db
-rw-r--r-- 1 root root 16 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Filter.db
-rw-r--r-- 1 root root 54 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Index.db
-rw-r--r-- 1 root root 4.3K Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Statistics.db
-rw-r--r-- 1 root root 68 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-Summary.db
-rw-r--r-- 1 root root 79 Apr 12 18:50 jw_schema1-users.idxAge-jb-1-TOC.txt
-rw-r--r-- 1 root root 43 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-CompressionInfo.db
-rw-r--r-- 1 root root 87 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Data.db
-rw-r--r-- 1 root root 16 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Filter.db
-rw-r--r-- 1 root root 58 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Index.db
-rw-r--r-- 1 root root 4.3K Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Statistics.db
-rw-r--r-- 1 root root 79 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-TOC.txt
-rw-r--r-- 1 root root 75 Apr 12 18:50 jw_schema1-users.idxLast-jb-1-Summary.db
-rw-r--r-- 1 root root 43 Apr 12 18:50 jw_schema1-users-jb-1-CompressionInfo.db
-rw-r--r-- 1 root root 206 Apr 12 18:50 jw_schema1-users-jb-1-Data.db
-rw-r--r-- 1 root root 4.3K Apr 12 18:50 jw_schema1-users-jb-1-Statistics.db
-rw-r--r-- 1 root root 45 Apr 12 18:50 jw_schema1-users-jb-1-Index.db
-rw-r--r-- 1 root root 16 Apr 12 18:50 jw_schema1-users-jb-1-Filter.db
-rw-r--r-- 1 root root 79 Apr 12 18:50 jw_schema1-users-jb-1-TOC.txt
-rw-r--r-- 1 root root 59 Apr 12 18:50 jw_schema1-users-jb-1-Summary.db

So far it looks good, now if you tail the cassandra system.log and start cassandra, notice that the sstables are being read. If within these down time, data supposed to be own by this node is missed, you should by now run nodetool repair to make sure data is sync.
 INFO [main] 2014-04-12 18:52:32,555 ColumnFamilyStore.java (line 254) Initializing jw_schema1.users
INFO [SSTableBatchOpen:1] 2014-04-12 18:52:32,568 SSTableReader.java (line 223) Opening /var/lib/cassandra/data/jw_schema1/users/jw_schema1-users-jb-1 (206 bytes)
INFO [main] 2014-04-12 18:52:32,701 ColumnFamilyStore.java (line 254) Initializing jw_schema1.users.idxLast
INFO [SSTableBatchOpen:1] 2014-04-12 18:52:32,719 SSTableReader.java (line 223) Opening /var/lib/cassandra/data/jw_schema1/users/jw_schema1-users.idxLast-jb-1 (87 bytes)
INFO [main] 2014-04-12 18:52:32,802 ColumnFamilyStore.java (line 254) Initializing jw_schema1.users.idxAge
INFO [SSTableBatchOpen:1] 2014-04-12 18:52:32,810 SSTableReader.java (line 223) Opening /var/lib/cassandra/data/jw_schema1/users/jw_schema1-users.idxAge-jb-1 (76 bytes)

jason@localhost:~/$ nodetool -h localhost repair jw_schema1 users
[2014-04-12 18:59:57,477] Starting repair command #1, repairing 1280 ranges for keyspace jw_schema1
..
[2014-04-12 19:00:50,800] Repair command #1 finished

Now we will check our data if it is still there.
jason@localhost:~/$ cqlsh 192.168.0.2 9160 -k jw_schema1
Connected to just4fun at 192.168.0.2:9160.
[cqlsh 4.1.1 | Cassandra 2.0.6 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.
cqlsh:jw_schema1> select * from users;

user_id | age | first | last | middle
---------+-----+-------+----------+--------
3 | 34 | john | smith | a
2 | 35 | olee | smith | b
1 | 33 | dan | bar | c
(3 rows)

cqlsh:jw_schema1>

All good. :)

In my humble opinion, because cassandra is built with durable and fault tolerant in mind, snapshot is rather not actually needed. Sure, it is fair to argue if someone deleted the data accidentally, but if you can prevent that by blocking from front end, you can actually save a lot of cost in term of cluster backup and restore maintenance cost. If you want to really ensure the data is save, spin up another cluster in another data centre, then the data is guaranteed safe from disaster. But hey, no harm learning a new tools in case you might need it later down the road.