Showing posts with label cassandra2.0.7. Show all posts
Showing posts with label cassandra2.0.7. Show all posts

Saturday, July 16, 2016

Initial learning into apache cassandra paxos

Recently I have been reading into apache lightweight transaction in cassandra 2.0 and interested into how it implemented in code level. From end user perspective, when you manupulating data either insert and update with if not exists, then internally, paxos operation willl be used.

An example of lightweight transaction.

 INSERT INTO USERS (login, email, name, login_count) values ('jbellis', 'jbellis@datastax.com', 'Jonathan Ellis', 1) IF NOT EXISTS  
   
 UPDATE users SET reset_token = null, password = ‘newpassword’ WHERE login = ‘jbellis’ IF reset_token = ‘some-generated-reset-token’  

Essentially the paxos how operation concerntrated in class StorageProxy.We read that from the code documentation,

There are three phases to Paxos:
1. Prepare: the coordinator generates a ballot (timeUUID in our case) and asks replicas to (a) promise
   not to accept updates from older ballots and (b) tell us about the most recent update it has already
   accepted.
2. Accept: if a majority of replicas reply, the coordinator asks replicas to accept the value of the
   highest proposal ballot it heard about, or a new value if no in-progress proposals were reported.
3. Commit (Learn): if a majority of replicas acknowledge the accept request, we can commit the new
   value.

So it involve a few operation before an insert and update can be perform and this is not something you want to replace in bulk operation call with. We see that in class StorageService, several paxos verbs are registered. You can find them mostly in the paxos packages. Following are some useful paxos classes.


Interesting in the class PaxosState, noticed the following
locks - an array of length 1024
call system keyspace class to load and save paxos state.

When you check into cassandra using cqlsh, you will find the following.

 cqlsh:jw_schema1> use system;  
 cqlsh:system> desc tables;  
   
 available_ranges     peers        paxos      range_xfers  
 batches          compaction_history batchlog    local     
 "IndexInfo"        sstable_activity  size_estimates hints     
 views_builds_in_progress peer_events     built_views    
   
 cqlsh:system> select * from paxos;  
   
  row_key | cf_id | in_progress_ballot | most_recent_commit | most_recent_commit_at | most_recent_commit_version | proposal | proposal_ballot | proposal_version  
 ---------+-------+--------------------+--------------------+-----------------------+----------------------------+----------+-----------------+------------------  
   
 (0 rows)  

You can also read the unit test for paxos in cassandra 2.0.17 as can be read here.

This compare and set (cas) operation looks interesting if you want to ensure the value only be created if it not exists, a nifty feature found in apache cassandra 2.0 onward. Feel free to explore further!

Friday, May 16, 2014

Learn and experiment with cassandra trigger

In cassandra 2.0, an experimental trigger was introduced and this seem exciting to bring cassandra into a whole new level. Today, by using cassandra 2.0.7 , we are going to learn cassandra trigger. But first, let's understand what conventional database trigger is.

Excerpt from wikipedia,

A database trigger is procedural code that is automatically executed in response to certain events on a particular table or view in a database. The trigger is mostly used for maintaining the integrity of the information on the database. For example, when a new record (representing a new worker) is added to the employees table, new records should also be created in the tables of the taxes, vacations and salaries.

So let's create a table in cassandra and then create a trigger for the table. We will do these execution via cqlsh and the example we are going to follow available in this link. Below are the steps I have taken from studying into the example trigger code.

1. build cassandra jar files in cassandra base directory.
2. build trigger-example.jar from trigger example directory.
3. upload trigger-example.jar to cassandra node directory in /etc/cassandra/triggers
4. copy InvertedIndex.properties to cassandra node directory in /etc/cassandra/
5. make cassandra aware of this jar and properties file addition via nodetool reloadtriggers
nodetool -h localhost reloadtriggers
5. repeat step 3 and 4 for all the nodes in the cluster.
6. create column family invertedindex via cqlsh.
7. create column family standard1 via cqlsh.
8. create trigger via cqlsh CREATE TRIGGER test1 ON "Keyspace1"."Standard1" USING 'org.apache.cassandra.triggers.InvertedIndex';
note that you can also drop trigger via command drop trigger test1 on "Keyspace1"."Standard1"

So that exciting part comes, when I tried to insert, the response keep on complaining key may not be empty, it is strange that we does specify the user_id as our key but it keep on giving error. So what went wrong?
cqlsh:keyspace1> insert into standard1 (user_id, age) values (124, 11);
Bad Request: Key may not be empty

TRACE [Thrift:5] 2014-05-12 22:17:02,492 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.UpdateStatement@164b11c @CL.ONE
DEBUG [Thrift:5] 2014-05-12 22:17:02,493 Tracing.java (line 159) request complete
ERROR [Thrift:5] 2014-05-12 22:17:02,493 CustomTThreadPoolServer.java (line 219) Error occurred during processing of message.
java.lang.RuntimeException: Exception while creating trigger on CF with ID: d04577ab-ecc0-3f57-bb01-6febc9d27803
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(TriggerExecutor.java:167)
at org.apache.cassandra.triggers.TriggerExecutor.execute(TriggerExecutor.java:91)
at org.apache.cassandra.service.StorageProxy.mutateWithTriggers(StorageProxy.java:525)
at org.apache.cassandra.cql3.statements.ModificationStatement.executeWithoutCondition(ModificationStatement.java:542)
at org.apache.cassandra.cql3.statements.ModificationStatement.execute(ModificationStatement.java:526)
at org.apache.cassandra.cql3.QueryProcessor.processStatement(QueryProcessor.java:158)
at org.apache.cassandra.cql3.QueryProcessor.process(QueryProcessor.java:175)
at org.apache.cassandra.thrift.CassandraServer.execute_cql3_query(CassandraServer.java:1959)
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4486)
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4470)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:201)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.NullPointerException
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:133)
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:128)
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:123)
at org.apache.cassandra.db.RowMutation.add(RowMutation.java:149)
at org.apache.cassandra.db.RowMutation.add(RowMutation.java:159)
at org.apache.cassandra.triggers.InvertedIndex.augment(InvertedIndex.java:46)
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(TriggerExecutor.java:159)
... 15 more
TRACE [Thrift:5] 2014-05-12 22:17:02,495 ThriftSessionManager.java (line 74) ClientState removed for socket ad

So I decided to go into further, and I got it to works after spending hours. Changes below.

1. change to lower letters for InvertedIndex.properties
$ cat /etc/cassandra/InvertedIndex.properties
keyspace=keyspace1
columnfamily=invertedindex

2. rebuild trigger-example.jar file with different augment method implementation and remember deploy this to every node in the cluster and execute command reloadtriggers using nodetool.
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.ArrayBackedSortedColumns;
import java.util.Collections;

public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
ByteBufferUtil.bytes(999)));
RowMutation rm = new RowMutation("keyspace1", key);
rm.add(extraUpdate);
return Collections.singletonList(rm);
}

3. drop both column family and recreate again, below are the schema.
cqlsh:keyspace1> desc table invertedindex;

CREATE TABLE invertedindex (
k int,
v1 int,
v2 int,
PRIMARY KEY (k)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

cqlsh:keyspace1> desc table test_table;

CREATE TABLE test_table (
k int,
v1 int,
v2 int,
PRIMARY KEY (k)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

and now when insert again into the cf, voila, 999 was auto created and no more exception in the log or cqlsh output!
cqlsh:keyspace1> select * from test_table;

(0 rows)

cqlsh:keyspace1> insert into test_table (k, v1) values (0, 0);
cqlsh:keyspace1> select * from test_table;

k | v1 | v2
---+----+-----
0 | 0 | 999

(1 rows)

Conclusion that we can draw is, since it is experimental, that means in the future, trigger is subject to many changes including API and chances that it could fail is higher ;-). It also need cassandra and java knowledge to build trigger at the mean time. Thus, you should not use this in production but that does not mean you cannot try this feature. In fact, cassandra would like to receive feedback  on the trigger to improve or make cassandra trigger production ready in the future.

That's it for this article, if you like, please go to the donation page to contribute back as funding will keep us continue to write in the future.

Sunday, May 11, 2014

Store video on cassandra and using hector streaming IO

Today, we are going to learn how to stream in and stream out using hector-client.  There are two classes implemented in hector-client which storing binary in chunk and reading binary in chunk. That's pretty neat! The two mentioned classes are

ChunkOutputStream storing binary as blog into cassandra.
ChunkInputStream read binary as blog from cassandra.

Below is a test case coded with the two classes to show how to store and read data using the two mentioned class.
import static org.junit.Assert.*;

import java.io.IOException;
import java.util.Arrays;

import me.prettyprint.cassandra.connection.HConnectionManager;
import me.prettyprint.cassandra.io.ChunkInputStream;
import me.prettyprint.cassandra.io.ChunkOutputStream;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;

import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.KsDef;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class HectorStreamTest {

private Keyspace keyspace;
private ThriftCluster cassandraCluster;
private CassandraHostConfigurator cassandraHostConfigurator;
protected HConnectionManager connectionManager;
public static KeyspaceDefinition KEYSPACE_DEV;
public final static String KEYSPACE = "TestKeyspace";
public final static String BLOB_CF = "Blob";
public final static CfDef BLOB_CF_DEF = new CfDef(KEYSPACE, BLOB_CF);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
}

@Before
public void setUp() throws Exception {
cassandraHostConfigurator = new CassandraHostConfigurator(
"192.168.0.2:9160");
connectionManager = new HConnectionManager("just4fun",
cassandraHostConfigurator);

KEYSPACE_DEV = new ThriftKsDef(new KsDef(KEYSPACE,
"org.apache.cassandra.locator.SimpleStrategy",
Arrays.asList(new CfDef[] { BLOB_CF_DEF })));
((ThriftKsDef) KEYSPACE_DEV).setReplicationFactor(1);
cassandraCluster = new ThriftCluster("just4fun",
cassandraHostConfigurator);

keyspace = HFactory.createKeyspace(KEYSPACE, cassandraCluster);

cassandraCluster.addKeyspace(KEYSPACE_DEV, true);
}

@After
public void tearDown() throws Exception {
cassandraCluster.dropKeyspace(KEYSPACE);
}

@Test
public void testWriteAndReadStream() throws IOException {
byte[] value = "hello world, store and read binary as a chunk of blob in and from cassandra.".getBytes();

// write to cassandra.
ChunkOutputStream<String> out = new ChunkOutputStream<String>(keyspace, BLOB_CF, "row1", StringSerializer.get(), 2);
out.write(value);
out.close();

// read from cassandra.
ChunkInputStream<String> in = new ChunkInputStream<String>(keyspace, BLOB_CF, "row1", StringSerializer.get());
int i = -1;
int written = 0;

while ((i = in.read()) != -1) {
assertSame(value[written++], (byte) i);
byte[] b = {(byte)i};
System.out.print(new String(b));
}

in.close();
}

}

Keyspace TestKeyspace is created and table Blob is use to write and read the blob data. The main point is probably on the chunk size in ChunkOutputStream, it is set to 2 but you can give another even number to store the byte. Remember, each byte is represented by two hexadecimal characters, see cqlsh output below for more information. The test method testWriteAndReadStream() store the data in variable value using ChunkOutputStream.write() and remember to close it so that the data actually flush to cassandra or else it will stay in client code. To read from cassandra, is by specifing the row to ChunkInputStream and calling method read() which it will return the data in chunk. When the test is done, keyspace is removed.
cqlsh:TestKeyspace> select * from "Blob";

key | column1 | value
------------+--------------------+--------
0x726f7731 | 0x0000000000000000 | 0x6800
0x726f7731 | 0x0000000000000001 | 0x6500
0x726f7731 | 0x0000000000000002 | 0x6c00
0x726f7731 | 0x0000000000000003 | 0x6c00
0x726f7731 | 0x0000000000000004 | 0x6f00
0x726f7731 | 0x0000000000000005 | 0x2000
0x726f7731 | 0x0000000000000006 | 0x7700
0x726f7731 | 0x0000000000000007 | 0x6f00
0x726f7731 | 0x0000000000000008 | 0x7200
0x726f7731 | 0x0000000000000009 | 0x6c00
0x726f7731 | 0x000000000000000a | 0x6400
0x726f7731 | 0x000000000000000b | 0x2c00
0x726f7731 | 0x000000000000000c | 0x2000
0x726f7731 | 0x000000000000000d | 0x7300
0x726f7731 | 0x000000000000000e | 0x7400
0x726f7731 | 0x000000000000000f | 0x6f00
0x726f7731 | 0x0000000000000010 | 0x7200
0x726f7731 | 0x0000000000000011 | 0x6500
0x726f7731 | 0x0000000000000012 | 0x2000
0x726f7731 | 0x0000000000000013 | 0x6100
0x726f7731 | 0x0000000000000014 | 0x6e00
0x726f7731 | 0x0000000000000015 | 0x6400
0x726f7731 | 0x0000000000000016 | 0x2000
0x726f7731 | 0x0000000000000017 | 0x7200
0x726f7731 | 0x0000000000000018 | 0x6500
0x726f7731 | 0x0000000000000019 | 0x6100
0x726f7731 | 0x000000000000001a | 0x6400
0x726f7731 | 0x000000000000001b | 0x2000
0x726f7731 | 0x000000000000001c | 0x6200
0x726f7731 | 0x000000000000001d | 0x6900
0x726f7731 | 0x000000000000001e | 0x6e00
0x726f7731 | 0x000000000000001f | 0x6100
0x726f7731 | 0x0000000000000020 | 0x7200
0x726f7731 | 0x0000000000000021 | 0x7900
0x726f7731 | 0x0000000000000022 | 0x2000
0x726f7731 | 0x0000000000000023 | 0x6100
0x726f7731 | 0x0000000000000024 | 0x7300
0x726f7731 | 0x0000000000000025 | 0x2000
0x726f7731 | 0x0000000000000026 | 0x6100
0x726f7731 | 0x0000000000000027 | 0x2000
0x726f7731 | 0x0000000000000028 | 0x6300
0x726f7731 | 0x0000000000000029 | 0x6800
0x726f7731 | 0x000000000000002a | 0x7500
0x726f7731 | 0x000000000000002b | 0x6e00
0x726f7731 | 0x000000000000002c | 0x6b00
0x726f7731 | 0x000000000000002d | 0x2000
0x726f7731 | 0x000000000000002e | 0x6f00
0x726f7731 | 0x000000000000002f | 0x6600
0x726f7731 | 0x0000000000000030 | 0x2000
0x726f7731 | 0x0000000000000031 | 0x6200
0x726f7731 | 0x0000000000000032 | 0x6c00
0x726f7731 | 0x0000000000000033 | 0x6f00
0x726f7731 | 0x0000000000000034 | 0x6200
0x726f7731 | 0x0000000000000035 | 0x2000
0x726f7731 | 0x0000000000000036 | 0x6900
0x726f7731 | 0x0000000000000037 | 0x6e00
0x726f7731 | 0x0000000000000038 | 0x2000
0x726f7731 | 0x0000000000000039 | 0x6100
0x726f7731 | 0x000000000000003a | 0x6e00
0x726f7731 | 0x000000000000003b | 0x6400
0x726f7731 | 0x000000000000003c | 0x2000
0x726f7731 | 0x000000000000003d | 0x6600
0x726f7731 | 0x000000000000003e | 0x7200
0x726f7731 | 0x000000000000003f | 0x6f00
0x726f7731 | 0x0000000000000040 | 0x6d00
0x726f7731 | 0x0000000000000041 | 0x2000
0x726f7731 | 0x0000000000000042 | 0x6300
0x726f7731 | 0x0000000000000043 | 0x6100
0x726f7731 | 0x0000000000000044 | 0x7300
0x726f7731 | 0x0000000000000045 | 0x7300
0x726f7731 | 0x0000000000000046 | 0x6100
0x726f7731 | 0x0000000000000047 | 0x6e00
0x726f7731 | 0x0000000000000048 | 0x6400
0x726f7731 | 0x0000000000000049 | 0x7200
0x726f7731 | 0x000000000000004a | 0x6100
0x726f7731 | 0x000000000000004b | 0x2e00

(76 rows)

cqlsh:TestKeyspace>

As basic classes provided by hector-client is in the package, it's a good to have feature to have if you want to stream in and stream out content like audio or video. I have implemented something similar here. The concept is similar, write binary content to cassandra and reconstruct the binary data into the file again.

That's it, hope you like this.

Saturday, May 10, 2014

Understand cassandra read path by tracing in CQL

In our last article, we explored cassandra 2.0.7 write path and in this article, we will explore cassandra read path. We will again follow the same investigation method we used on write for read too. That is, we will trace the read path by turning on tracing in cqlsh.

Let's start by enabling tracing and consistency to all. Then issue statement select and start to dig into code. Below are the output of the commands executed in cqlsh and output in cassandra system.log
cqlsh:jw_schema1> consistency all;
Consistency level set to ALL.
cqlsh:jw_schema1> tracing on;
Now tracing requests.
cqlsh:jw_schema1> select * from users;

user_id | age | first | last | middle
---------+-----+-----------+-------+--------
4 | 10 | john30003 | smith | junior
3 | 10 | john30003 | smith | junior
5 | 10 | john30003 | smith | junior
2 | 10 | john30003 | smith | junior

(4 rows)

Tracing session: 66a845c0-d5f3-11e3-bd26-a322c40b8b81

activity | timestamp | source | source_elapsed
-------------------------------------------------------------------------------------------------+--------------+---------------+----------------
execute_cql3_query | 22:25:12,732 | <node3_ip> | 0
Message received from /<node3_ip> | 22:25:11,106 | <node2_ip> | 26
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:11,106 | <node2_ip> | 289
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 442
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 581
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 658
Read 1 live and 0 tombstoned cells | 22:25:11,106 | <node2_ip> | 724
Scanned 4 rows and matched 4 | 22:25:11,106 | <node2_ip> | 760
Enqueuing response to /<node3_ip> | 22:25:11,106 | <node2_ip> | 785
Sending message to /<node3_ip> | 22:25:11,107 | <node2_ip> | 954
Message received from /<node3_ip> | 22:25:12,430 | <node1_ip> | 76
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:12,431 | <node1_ip> | 1054
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1250
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1399
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1537
Read 1 live and 0 tombstoned cells | 22:25:12,432 | <node1_ip> | 1718
Scanned 4 rows and matched 4 | 22:25:12,432 | <node1_ip> | 1777
Enqueuing response to /<node3_ip> | 22:25:12,432 | <node1_ip> | 1935
Sending message to /<node3_ip> | 22:25:12,433 | <node1_ip> | 2212
Parsing select * from users LIMIT 10000; | 22:25:12,732 | <node3_ip> | 148
Preparing statement | 22:25:12,732 | <node3_ip> | 259
Determining replicas to query | 22:25:12,733 | <node3_ip> | 941
Enqueuing request to /<node2_ip> | 22:25:12,738 | <node3_ip> | 5556
Enqueuing request to /<node1_ip> | 22:25:12,738 | <node3_ip> | 5645
Enqueuing request to <node3_hostname>/<node3_ip> | 22:25:12,738 | <node3_ip> | 5688
Sending message to /<node2_ip> | 22:25:12,738 | <node3_ip> | 5811
Sending message to /192.168.0.2 | 22:25:12,738 | <node3_ip> | 5817
Sending message to /<node1_ip> | 22:25:12,738 | <node3_ip> | 6133
Message received from /<node3_ip> | 22:25:12,739 | <node3_ip> | 6558
Executing seq scan across 0 sstables for [min(-9223372036854775808), min(-9223372036854775808)] | 22:25:12,740 | <node3_ip> | 7294
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 7506
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 7698
Read 1 live and 0 tombstoned cells | 22:25:12,740 | <node3_ip> | 8222
Read 1 live and 0 tombstoned cells | 22:25:12,741 | <node3_ip> | 8570
Scanned 4 rows and matched 4 | 22:25:12,741 | <node3_ip> | 8634
Enqueuing response to /<node3_ip> | 22:25:12,741 | <node3_ip> | 8689
Sending message to /192.168.0.2 | 22:25:12,741 | <node3_ip> | 8821
Message received from /<node3_ip> | 22:25:12,742 | <node3_ip> | null
Processing response from /<node3_ip> | 22:25:12,742 | <node3_ip> | null
Message received from /<node1_ip> | 22:25:13,029 | <node3_ip> | null
Processing response from /<node1_ip> | 22:25:13,029 | <node3_ip> | null
Message received from /<node2_ip> | 22:25:13,061 | <node3_ip> | null
Processing response from /<node2_ip> | 22:25:13,061 | <node3_ip> | null
Read 5 live and 0 tombstoned cells | 22:25:13,086 | <node3_ip> | 353631
Read 5 live and 0 tombstoned cells | 22:25:13,087 | <node3_ip> | 355232
Read 5 live and 0 tombstoned cells | 22:25:13,093 | <node3_ip> | 360675
Read 5 live and 0 tombstoned cells | 22:25:13,093 | <node3_ip> | 360908
Request complete | 22:25:13,093 | <node3_ip> | 361266

cqlsh:jw_schema1>

TRACE [Thrift:186] 2014-05-07 22:25:12,733 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@43e049 @CL.ALL
DEBUG [Thrift:186] 2014-05-07 22:25:13,604 CassandraServer.java (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-07 22:25:13,605 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@17100f1 @CL.ONE
DEBUG [Thrift:186] 2014-05-07 22:25:13,911 Tracing.java (line 159) request complete
DEBUG [Thrift:186] 2014-05-07 22:25:13,915 CassandraServer.java (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-07 22:25:13,916 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@d34f6 @CL.ONE
DEBUG [Thrift:186] 2014-05-07 22:25:14,227 Tracing.java (line 159) request complete

As write entry path is execute_cql3_query, so is read path. If you trace the code down, it will be too much to even start the discussion. I summarize the points below in tandem with the output of cqlsh tracing and system.log where applicable. Thus it may not be complete but I will give you the link as narration goes so that you can study yourself in detail.

It started at CassandraServer.execute_cql3_query(...)  as indicated in cqlsh tracing output. So basically the work done can be summarize by this line:
cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();

If you step into the code above, QueryProcessor.process(...)  which implement the interface QueryHandler  which get a valid CQLStatement. The execution continue by calling method QueryProcessor.processStatement(...). Notice that the logger in this method is shown in cassandra system.log (of cause you need to enable tracing for this class in log4j.properties in order for this line to log successfully). So access checking and validation are perform here. When checking and validation were done, then CQLStatement.execute(...) is executed.

Because we are executing select statement, the correspond class that implement interface CQLStatement is SelectStatement.  Extract from SelectStatement.execute(...)
public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
ConsistencyLevel cl = options.getConsistency();
List<ByteBuffer> variables = options.getValues();
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");

cl.validateForRead(keyspace());

int limit = getLimit(variables);
long now = System.currentTimeMillis();
Pageable command;
if (isKeyRange || usesSecondaryIndexing)
{
command = getRangeCommand(variables, limit, now);
}
else
{
List<ReadCommand> commands = getSliceCommands(variables, limit, now);
command = commands == null ? null : new Pageable.ReadCommands(commands);
}

int pageSize = options.getPageSize();
// A count query will never be paged for the user, but we always page it internally to avoid OOM.
// If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
// Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
if (parameters.isCount && pageSize <= 0 && MessagingService.instance().allNodesAtLeast20)
pageSize = DEFAULT_COUNT_PAGE_SIZE;

if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
{
return execute(command, cl, variables, limit, now);
}
else
{
QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState());
if (parameters.isCount)
return pageCountQuery(pager, variables, pageSize, now);

// We can't properly do post-query ordering if we page (see #6722)
if (needsPostQueryOrdering())
throw new InvalidRequestException("Cannot page queries with both ORDER BY and a IN restriction on the partition key; you must either remove the "
+ "ORDER BY or the IN and sort client side, or disable paging for this query");

List<Row> page = pager.fetchPage(pageSize);
ResultMessage.Rows msg = processResults(page, variables, limit, now);
if (!pager.isExhausted())
msg.result.metadata.setHasMorePages(pager.state());
return msg;
}
}

The execution continue to get the Pageable Command . Execution continue to private method execute(...). Then method getRangeSlice(...) is called. This is the actual work done to retrieve all the rows. This method implementation does a lot of works and I would recommend you click on the link and study the code yourself to get a better picture.

When the control is returned, the rows are sent for further processing using method processResults(...)  which eventually return the result back to the cassandra client.

As you may have notice, the upper layer execution is similar as to write path execution, until control passed to CQLStatement. That's it for this article, I hope you like it.

Friday, May 9, 2014

Understand cassandra write path by tracing in CQL

In this article, we will learn the write path for cassandra 2.0.7. Since cql is the way moving forward, we will start learning write path by focusing on cqlsh. Let's turn on the tracing, consistency to all and insert one row of data. Read output below:
cqlsh:jw_schema1> tracing on;
Now tracing requests.
cqlsh:jw_schema1> consistency all;
Consistency level set to ALL.

cqlsh:jw_schema1> insert into users (user_id, age, first, last, middle) values ('1', 10, 'john30003', 'smith', 'junior');

Tracing session: 03477650-d43f-11e3-bd26-a322c40b8b81

activity | timestamp | source | source_elapsed
-----------------------------------------------------------------------------------------------------------------+--------------+---------------+----------------
execute_cql3_query | 18:21:25,430 | <node1_ip> | 0
Message received from /<node1_ip> | 18:21:23,795 | <node2_ip> | 52
Acquiring switchLock read lock | 18:21:23,795 | <node2_ip> | 455
Appending to commitlog | 18:21:23,795 | <node2_ip> | 497
Adding to users memtable | 18:21:23,795 | <node2_ip> | 613
Enqueuing response to /<node1_ip> | 18:21:23,800 | <node2_ip> | 5520
Sending message to /<node1_ip> | 18:21:23,801 | <node2_ip> | 6359
Message received from /<node1_ip> | 18:21:25,121 | <node3_ip> | 84
Acquiring switchLock read lock | 18:21:25,123 | <node3_ip> | 1777
Appending to commitlog | 18:21:25,123 | <node3_ip> | 1826
Adding to users memtable | 18:21:25,123 | <node3_ip> | 2121
Enqueuing response to /<node1_ip> | 18:21:25,129 | <node3_ip> | 8278
Sending message to /<node1_ip> | 18:21:25,129 | <node3_ip> | 8563
Parsing insert into users (user_id, age, first, last, middle) values ('1', 10, 'john30003', 'smith', 'junior'); | 18:21:25,430 | <node1_ip> | 93
Preparing statement | 18:21:25,430 | <node1_ip> | 227
Determining replicas for mutation | 18:21:25,433 | <node1_ip> | 2721
Sending message to /<node2_ip> | 18:21:25,433 | <node1_ip> | 3525
Sending message to /<node3_ip> | 18:21:25,434 | <node1_ip> | 3751
Acquiring switchLock read lock | 18:21:25,434 | <node1_ip> | 3963
Appending to commitlog | 18:21:25,434 | <node1_ip> | 3992
Adding to users memtable | 18:21:25,434 | <node1_ip> | 4067
Message received from /<node3_ip> | 18:21:25,730 | <node1_ip> | 300016
Processing response from /<node3_ip> | 18:21:25,730 | <node1_ip> | 300178
Message received from /<node2_ip> | 18:21:25,738 | <node1_ip> | 308225
Processing response from /<node2_ip> | 18:21:25,738 | <node1_ip> | 308676
Request complete | 18:21:25,738 | <node1_ip> | 308825

TRACE [Thrift:186] 2014-05-05 18:24:33,825 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.UpdateStatement@17d2390 @CL.ALL
DEBUG [Thrift:186] 2014-05-05 18:24:34,621 CassandraServer.java (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-05 18:24:34,622 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@159d495 @CL.ONE
DEBUG [Thrift:186] 2014-05-05 18:24:34,623 Tracing.java (line 159) request complete
DEBUG [Thrift:186] 2014-05-05 18:24:34,626 CassandraServer.java (line 1955) execute_cql3_query
TRACE [Thrift:186] 2014-05-05 18:24:34,626 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.SelectStatement@75219b @CL.ONE
DEBUG [Thrift:186] 2014-05-05 18:24:34,629 Tracing.java (line 159) request complete

If you noticed, the entry path will be execute_cql3_query no matter write or read. If you trace the code down, it will be too much to even start the discussion. I summarize the points below in tandem with the output of cqlsh tracing and system.log where applicable. Thus it may not be complete but I will give you the link to the code as narration goes so that you can study yourself in detail.

It started at CassandraServer.execute_cql3_query(...)  as indicated in cqlsh tracing output. So basically the work done can be summarize by this line:
cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();

If you step into the line above, QueryProcessor.process(...) which implement the interface QueryHandler which get a valid CQLStatement. The execution continue by calling method QueryProcessor.processStatement(...). Notice that the logger in this method is shown in cassandra system.log (of cause you need to enable tracing for this class in log4j.properties in order for this line to log successfully). So access checking and validation are perform here. When checking and validation were done, then CQLStatement.execute(...) is executed. Because we are adding a new row by inserting a new row of data, the correspond class that implement interface CQLStatement is ModificationStatement.  Extract from ModificationStatement.execute(...)
public ResultMessage execute(QueryState queryState, QueryOptions options)
throws RequestExecutionException, RequestValidationException
{
if (options.getConsistency() == null)
throw new InvalidRequestException("Invalid empty consistency level");

if (hasConditions() && options.getProtocolVersion() == 1)
throw new InvalidRequestException("Conditional updates are not supported by the protocol version in use. You need to upgrade to a driver using the native protocol v2.");

return hasConditions()
? executeWithCondition(queryState, options)
: executeWithoutCondition(queryState, options);
}

The execution continue to the method ModificationStatement.executeWithoutCondition(...)  as our insert statement does not contain if not exists. Method getMutations(...) return a collection of mutations to be perform.

The collections of mutation is pass to StorageProxy.mutateWithTriggers(...) for further processing. This column family does not have trigger, so the execution continue to method StorageProxy.mutate() . The description of this method is informative, it write:

Use this method to have these Mutations applied across all replicas. This method will take care of the possibility of a replica being down and hint the data across to some other replica.

So this method basically does saving of data by applying to all replicas. If you trace along this path, you should notice the cqlsh tracing debug output appear along the way.

That's it for this article, for my next article, we will trace for cassandra read path. Thank you.