In cassandra 2.0, an
experimental trigger was introduced and this seem exciting to bring cassandra into a
whole new level. Today, by using cassandra 2.0.7 , we are going to learn cassandra trigger. But first, let's understand what conventional database trigger is.
Excerpt from
wikipedia,
A database trigger is procedural code that is automatically executed in response to certain events on a particular table or view in a database. The trigger is mostly used for maintaining the integrity of the information on the database. For example, when a new record (representing a new worker) is added to the employees table, new records should also be created in the tables of the taxes, vacations and salaries.So let's create a table in cassandra and then create a trigger for the table. We will do these execution via cqlsh and the example we are going to follow available in this
link. Below are the steps I have taken from studying into the example trigger code.
1. build cassandra jar files in cassandra base directory.
2. build trigger-example.jar from trigger example directory.
3. upload
trigger-example.jar to cassandra node directory in
/etc/cassandra/triggers4. copy
InvertedIndex.properties to cassandra node directory in
/etc/cassandra/5. make cassandra aware of this jar and properties file addition via nodetool reloadtriggers
nodetool -h localhost reloadtriggers5. repeat step 3 and 4 for all the nodes in the cluster.
6. create column family invertedindex via cqlsh.
7. create column family standard1 via cqlsh.
8. create trigger via cqlsh
CREATE TRIGGER test1 ON "Keyspace1"."Standard1" USING 'org.apache.cassandra.triggers.InvertedIndex';note that you can also drop trigger via command drop trigger test1 on "Keyspace1"."Standard1"
So that exciting part comes, when I tried to insert, the response keep on complaining
key may not be empty, it is strange that we does specify the user_id as our key but it keep on giving error. So what went wrong?
cqlsh:keyspace1> insert into standard1 (user_id, age) values (124, 11);
Bad Request: Key may not be empty
TRACE [Thrift:5] 2014-05-12 22:17:02,492 QueryProcessor.java (line 153) Process org.apache.cassandra.cql3.statements.UpdateStatement@164b11c @CL.ONE
DEBUG [Thrift:5] 2014-05-12 22:17:02,493 Tracing.java (line 159) request complete
ERROR [Thrift:5] 2014-05-12 22:17:02,493 CustomTThreadPoolServer.java (line 219) Error occurred during processing of message.
java.lang.RuntimeException: Exception while creating trigger on CF with ID: d04577ab-ecc0-3f57-bb01-6febc9d27803
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(TriggerExecutor.java:167)
at org.apache.cassandra.triggers.TriggerExecutor.execute(TriggerExecutor.java:91)
at org.apache.cassandra.service.StorageProxy.mutateWithTriggers(StorageProxy.java:525)
at org.apache.cassandra.cql3.statements.ModificationStatement.executeWithoutCondition(ModificationStatement.java:542)
at org.apache.cassandra.cql3.statements.ModificationStatement.execute(ModificationStatement.java:526)
at org.apache.cassandra.cql3.QueryProcessor.processStatement(QueryProcessor.java:158)
at org.apache.cassandra.cql3.QueryProcessor.process(QueryProcessor.java:175)
at org.apache.cassandra.thrift.CassandraServer.execute_cql3_query(CassandraServer.java:1959)
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4486)
at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4470)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:201)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.NullPointerException
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:133)
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:128)
at org.apache.cassandra.db.RowMutation.addOrGet(RowMutation.java:123)
at org.apache.cassandra.db.RowMutation.add(RowMutation.java:149)
at org.apache.cassandra.db.RowMutation.add(RowMutation.java:159)
at org.apache.cassandra.triggers.InvertedIndex.augment(InvertedIndex.java:46)
at org.apache.cassandra.triggers.TriggerExecutor.executeInternal(TriggerExecutor.java:159)
... 15 more
TRACE [Thrift:5] 2014-05-12 22:17:02,495 ThriftSessionManager.java (line 74) ClientState removed for socket ad
So I decided to go into further, and I got it to works after spending hours. Changes below.
1. change to lower letters for
InvertedIndex.properties$ cat /etc/cassandra/InvertedIndex.properties
keyspace=keyspace1
columnfamily=invertedindex
2. rebuild
trigger-example.jar file with different augment method implementation and remember deploy this to every node in the cluster and execute command reloadtriggers using nodetool.
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.ArrayBackedSortedColumns;
import java.util.Collections;
public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
ByteBufferUtil.bytes(999)));
RowMutation rm = new RowMutation("keyspace1", key);
rm.add(extraUpdate);
return Collections.singletonList(rm);
}
3. drop both column family and recreate again, below are the schema.
cqlsh:keyspace1> desc table invertedindex;
CREATE TABLE invertedindex (
k int,
v1 int,
v2 int,
PRIMARY KEY (k)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};
cqlsh:keyspace1> desc table test_table;
CREATE TABLE test_table (
k int,
v1 int,
v2 int,
PRIMARY KEY (k)
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
comment='' AND
dclocal_read_repair_chance=0.000000 AND
gc_grace_seconds=864000 AND
index_interval=128 AND
read_repair_chance=0.100000 AND
replicate_on_write='true' AND
populate_io_cache_on_flush='false' AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};
and now when insert again into the cf, voila, 999 was auto created and no more exception in the log or cqlsh output!
cqlsh:keyspace1> select * from test_table;
(0 rows)
cqlsh:keyspace1> insert into test_table (k, v1) values (0, 0);
cqlsh:keyspace1> select * from test_table;
k | v1 | v2
---+----+-----
0 | 0 | 999
(1 rows)
Conclusion that we can draw is, since it is
experimental, that means in the future, trigger is subject to many changes including API and chances that it could fail is higher ;-). It also need cassandra and java knowledge to build trigger at the mean time. Thus, you should not use this in production but that does not mean you cannot try this feature. In fact, cassandra would like to receive
feedback on the trigger to improve or make cassandra trigger production ready in the future.
That's it for this article, if you like, please go to the donation page to contribute back as funding will keep us continue to write in the future.