Showing posts with label jmx. Show all posts
Showing posts with label jmx. Show all posts

Sunday, June 19, 2016

Investigating into apache cassandra 1.2 jmx metrics connection type warn logging

Recently I got the opportunity to upgrade a production cassandra cluster from 1.1.12 to 1.2.19 and during the midst of upgrading, I noticed the following in the log file during boot up of a cassandra 1.2 instance.

1:  WARN 10:30:14,987 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=Timeouts  
2:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=Timeouts  
3:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
4:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
5:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
6:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
7:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
8:     at com.yammer.metrics.reporting.JmxReporter.processMeter(JmxReporter.java:412)  
9:     at com.yammer.metrics.reporting.JmxReporter.processMeter(JmxReporter.java:16)  
10:     at com.yammer.metrics.core.Meter.processWith(Meter.java:131)  
11:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
12:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
13:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
14:     at com.yammer.metrics.core.MetricsRegistry.newMeter(MetricsRegistry.java:240)  
15:     at com.yammer.metrics.Metrics.newMeter(Metrics.java:245)  
16:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:106)  
17:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
18:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
19:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
20:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
21:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
22:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
23:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
24:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
25:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
26:     at java.lang.Thread.run(Thread.java:745)  

As the logging level is WARN, I did not worry that much. Going into the codes, it turn out that in cassandra 1.2 , a metric known as ConnectionMetrics is added. This metric is under domain org.apache.cassandra.metrics and of type connection and name is Timeouts. This is not available in cassandra 1.1.

The same situation is applicable to CommandPendingTasks, ResponseCompletedTasks, ResponsePendingTasks, CommandCompletedTasks.


1:  WARN 10:38:58,079 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandPendingTasks  
2:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandPendingTasks  
3:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
4:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
5:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
6:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
7:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
8:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
9:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
10:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
11:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
12:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
13:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
14:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
15:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
16:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:71)  
17:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
18:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
19:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
20:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
21:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
22:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
23:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
24:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
25:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
26:     at java.lang.Thread.run(Thread.java:745)  
27:       
28:   WARN 07:52:19,882 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponseCompletedTasks  
29:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponseCompletedTasks  
30:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
31:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
32:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
33:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
34:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
35:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
36:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
37:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
38:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
39:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
40:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
41:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
42:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
43:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:99)  
44:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
45:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
46:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
47:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
48:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
49:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
50:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
51:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
52:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
53:     at java.lang.Thread.run(Thread.java:745)  
54:       
55:   WARN 09:06:07,059 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponsePendingTasks  
56:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponsePendingTasks  
57:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
58:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
59:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
60:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
61:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
62:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
63:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
64:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
65:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
66:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
67:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
68:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
69:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
70:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:92)  
71:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
72:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
73:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
74:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
75:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
76:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
77:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
78:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
79:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
80:     at java.lang.Thread.run(Thread.java:745)     
81:    
82:    
83:   WARN 02:13:09,861 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandCompletedTasks  
84:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandCompletedTasks  
85:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
86:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
87:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
88:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
89:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
90:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
91:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
92:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
93:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
94:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
95:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
96:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
97:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
98:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:78)  
99:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
100:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
101:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
102:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
103:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
104:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
105:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
106:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
107:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
108:     at java.lang.Thread.run(Thread.java:745)  
This is indeed nothing to worry about. When you done upgrading all your nodes in the cluster, do another round of restart, this type of warning logging will disappear.


Friday, January 16, 2015

operate casandra using jmx in terminal including changing pool size, compacting sstables and key cache

If you operate apache cassandra cluster and if load per node goes huge (like nodetool info show 800GB), compactions become a problem. It's a big problem for apache cassandra 1.0.8 if you have load per node average hover around 600GB to 1TB. The read performance suffers and at times system uptime load goes high. In some instance, I noticed when repair is running, system load goes more than 20. It's not a concern if this is operating well, but the more often you see this, something has gone wrong. Today, I will share my experience on how to operate cassandra when node load is huge and cassandra instance is still running. Often times, there are nice method that is exposed via jmx but to operate remotely, jmx gui client such as jmxconsole is not ideal. Instead, we will using a jmxterm for these operation in apache cassandra 1.0.8. So let's get started.

Changing pool size

So, it is pretty simple, launch it and set to the bean, and then set the CorePoolSize. The steps will be illustrate below.
$ java -jar jmxterm-1.0-alpha-4-uber.jar
$>open localhost:7199
#Connection to localhost:7199 is opened
$>bean org.apache.cassandra.request:type=ReplicateOnWriteStage
#bean is set to org.apache.cassandra.request:type=ReplicateOnWriteStage
$>get CorePoolSize
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage:
CorePoolSize = 32;
$>info
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage
#class name = org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor
# attributes
%0 - ActiveCount (int, r)
%1 - CompletedTasks (long, r)
%2 - CorePoolSize (int, rw)
%3 - CurrentlyBlockedTasks (int, r)
%4 - PendingTasks (long, r)
%5 - TotalBlockedTasks (int, r)
#there's no operations
#there's no notifications
$>set CorePoolSize 64
$>get CorePoolSize
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage:
CorePoolSize = 64;

Alter key cache

Often times, when there is heap pressure in the jvm, the safety valve kicks in.  You can restart the cassandra instance or you can reset the key cache back to the initial value. Assuming your column family name FooBar and keyspace just4fun, then the following are steps to illustrate how is this done.
$>bean org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
#bean is set to org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
$>info
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
#class name = org.apache.cassandra.cache.AutoSavingKeyCache
# attributes
%0 - Capacity (int, rw)
%1 - Hits (long, r)
%2 - RecentHitRate (double, r)
%3 - Requests (long, r)
%4 - Size (int, r)
#there's no operations
#there's no notifications
$>
$>get Size
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
Size = 122307;

$>set Capacity 250000
#Value of attribute Capacity is set to 250000
$>get Capacity;
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
$>get Capacity
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
Capacity = 250000;

Compact sstable

Lastly, to compact sstables. It's amazing we have a sstable that as huge as 84GB! So trigger major compaction is not an option here, often time when load per node goes beyond 600GB, compaction took forever, as GC kick in and cpu keep on recollecting heap, making system load goes high. So here, we will select one sstable that is huge and compact that only. You can also select a few sstable and compact them and separate using comma.
$>bean org.apache.cassandra.db:type=CompactionManager
#bean is set to org.apache.cassandra.db:type=CompactionManager
$>run forceUserDefinedCompaction just4fun FooBar-hc-5-Index.db
#calling operation forceUserDefinedCompaction of mbean org.apache.cassandra.db:type=CompactionManager
#RuntimeMBeanException: java.lang.IllegalArgumentException: FooBar-hc-5-Index.db does not appear to be a data file
$>run forceUserDefinedCompaction just4fun FooBar-hc-401-Data.db
#calling operation forceUserDefinedCompaction of mbean org.apache.cassandra.db:type=CompactionManager
#operation returns:
null

The compaction should be started, you can check in cassandra system log or the nodetool compaction. So that's it, I hope you learned something.

Sunday, April 13, 2014

Research into cassandra nodetool cfhistograms and interpret statistics

What is nodetool cfhistogram?

According to the official documentation definition: The nodetool cfhistograms command provides statistics about a table, including read/write latency, row size, column count, and number of SSTables.

If you noticed the picture output below, it is entirely different than the cfhistogram output in cassandra 2.0.6 . Apparently output of cfhistograms is simplified and improved! You can find more information about this improvement here. To get the existing way of output, give −−compact to the nodetool as a parameter.



Okay, let's start by issue command nodetool cfhistograms to our cluster.
jason@localhost:~$ nodetool -h localhost cfhistograms jw_schema1 users
jw_schema1/users histograms

SSTables per Read
1 sstables: 997

Write Latency (microseconds)
No Data

Read Latency (microseconds)
103 us: 1
124 us: 15
149 us: 28
179 us: 131
215 us: 306
258 us: 373
310 us: 66
372 us: 17
446 us: 6
535 us: 21
642 us: 10
770 us: 2
924 us: 1
1109 us: 3
1331 us: 1
1597 us: 1
1916 us: 3
2299 us: 0
2759 us: 2
3311 us: 1
3973 us: 0
4768 us: 0
5722 us: 1
6866 us: 0
8239 us: 1
9887 us: 4
11864 us: 1
14237 us: 1
17084 us: 1

Partition Size (bytes)
149 bytes: 3

Cell Count per Partition
5 cells: 3

The statistics is a bit difficult to understand if you do not know what does it mean. Let's begin by studying into the cfhistograms codes.
private void printCfHistograms(String keySpace, String columnFamily, PrintStream output, boolean compactFormat)
{
ColumnFamilyStoreMBean store = this.probe.getCfsProxy(keySpace, columnFamily);

// default is 90 offsets
long[] offsets = new EstimatedHistogram().getBucketOffsets();

long[] rrlh = store.getRecentReadLatencyHistogramMicros();
long[] rwlh = store.getRecentWriteLatencyHistogramMicros();
long[] sprh = store.getRecentSSTablesPerReadHistogram();
long[] ersh = store.getEstimatedRowSizeHistogram();
long[] ecch = store.getEstimatedColumnCountHistogram();

output.println(String.format("%s/%s histograms", keySpace, columnFamily));
output.println("");

if (compactFormat)
{
output.println(String.format("%-10s%10s%18s%18s%18s%18s",
"Offset", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
output.println(String.format("%-10s%10s%18s%18s%18s%18s",
"", "", "(micros)", "(micros)", "(bytes)", ""));

for (int i = 0; i < offsets.length; i++)
{
output.println(String.format("%-10d%10s%18s%18s%18s%18s",
offsets[i],
(i < sprh.length ? sprh[i] : "0"),
(i < rwlh.length ? rwlh[i] : "0"),
(i < rrlh.length ? rrlh[i] : "0"),
(i < ersh.length ? ersh[i] : "0"),
(i < ecch.length ? ecch[i] : "0")));
}
}
else
{
output.println("SSTables per Read");
printHistogram(sprh, offsets, "sstables", output);

output.println("Write Latency (microseconds)");
printHistogram(rwlh, offsets, "us", output);

output.println("Read Latency (microseconds)");
printHistogram(rrlh, offsets, "us", output);

output.println("Partition Size (bytes)");
printHistogram(ersh, offsets, "bytes", output);

output.println("Cell Count per Partition");
printHistogram(ecch, offsets, "cells", output);
}
}

Essentially a proxy ColumnFamilyStoreMBean is made through jmx ($ jconsole service:jmx:rmi:///jndi/rmi://192.168.0.2:7199/jmxrmi also see picture below) based on the previous keyspace and column family specified in the nodetool parameter. The default bucket offset will always be 90. Thus if you carefully analyzed the row output of the compact statistics, you will noticed exactly 90 rows each time nodetool cfhistogram command is triggered.



You would ask, why would 90 bucket offsets? Well according to the codes documentation:
The series of values to which the counts in `buckets` correspond:
1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 14, 17, 20, etc.
Thus, a `buckets` of [0, 0, 1, 10] would mean we had seen one value of 3 and 10 values of 4.

The series starts at 1 and grows by 1.2 each time (rounding and removing duplicates). It goes from 1
to around 36M by default (creating 90+1 buckets), which will give us timing resolution from microseconds to
36 seconds, with less precision as the numbers get larger.

Each bucket represents values from (previous bucket offset, current offset].

Depending if parameter compact is specified, the output will be different. There are six metrics exposed. We will take a closer look.

  • offset | the bucket offset


Bucket offset from 149 (exclusive) to 179 (inclusive). Essentially this bucket offset contain latency from 149 microseconds until 179 microseconds.




  • SSTables | recent SSTables per read


With each read, total of sstables accessed accountable for. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.


This metric will increase if there is any call to CollationController.java or CacheService.java




  • Write Latency (micros) | recent write latency histogram in microseconds.


An array representing the latency histogram for write in microseconds. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.


This metric will increase if there is any call to ColumnFamilyStore.java, StorageProxy.java or WeightedQueue.java .




  • Read Latency (micros) | recent read latency histogram in Microseconds.


An array representing the latency histogram for read in microseconds. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will be reset.




  • Partition Size (bytes ) | estimated row size histogram


As estimation of row size in bytes. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will NOT reset.


The metric is collected by iterating over the sstables, and get the estimated row size in bytes.




  • Cell Count | estimated column count histogram


Estimated number of columns. Note that for each nodetool cfhistograms trigger for this keyspace and column family, this metric will NOT reset.


The metric is collected by iterating over the sstables, and get the estimated column count.


So with these interpretation from the codes, let's take another compact form cfhistogram to interpret the metrics. First, we will make start by make some statistics:
cqlsh:jw_schema1> select * from users where age > 5 and age < 50 and last = 'smith' allow filtering;

jason@localhost:~$ nodetool -h localhost cfhistograms jw_schema1 users -c
jw_schema1/users histograms

Offset SSTables Write Latency Read Latency Partition Size Cell Count
(micros) (micros) (bytes)
1 997 0 0 0 0
2 0 0 0 0 0
3 0 0 0 0 0
4 0 0 0 0 0
5 0 0 0 0 1000
6 0 0 0 0 0
7 0 0 0 0 0
8 0 0 0 0 0
10 0 0 0 0 0
12 0 0 0 0 0
14 0 0 0 0 0
17 0 0 0 0 0
20 0 0 0 0 0
24 0 0 0 0 0
29 0 0 0 0 0
35 0 0 0 0 0
42 0 0 0 0 0
50 0 0 0 0 0
60 0 0 0 0 0
72 0 0 0 0 0
86 0 0 0 0 0
103 0 0 0 0 0
124 0 0 0 0 0
149 0 0 0 999 0
179 0 0 0 1 0
215 0 0 0 0 0
258 0 0 0 0 0
310 0 0 0 0 0
372 0 0 0 0 0
446 0 0 0 0 0
535 0 0 0 0 0
642 0 0 0 0 0
770 0 0 0 0 0
924 0 0 0 0 0
1109 0 0 0 0 0
1331 0 0 51 0 0
1597 0 0 491 0 0
1916 0 0 95 0 0
2299 0 0 53 0 0
2759 0 0 84 0 0
3311 0 0 95 0 0
3973 0 0 41 0 0
4768 0 0 32 0 0
5722 0 0 25 0 0
6866 0 0 9 0 0
8239 0 0 7 0 0
9887 0 0 6 0 0
11864 0 0 4 0 0
14237 0 0 0 0 0
17084 0 0 2 0 0
20501 0 0 0 0 0
24601 0 0 0 0 0
29521 0 0 0 0 0
35425 0 0 0 0 0
42510 0 0 1 0 0
51012 0 0 0 0 0
61214 0 0 0 0 0
73457 0 0 0 0 0
88148 0 0 0 0 0
105778 0 0 1 0 0
126934 0 0 0 0 0
152321 0 0 0 0 0
182785 0 0 0 0 0
219342 0 0 0 0 0
263210 0 0 0 0 0
315852 0 0 0 0 0
379022 0 0 0 0 0
454826 0 0 0 0 0
545791 0 0 0 0 0
654949 0 0 0 0 0
785939 0 0 0 0 0
943127 0 0 0 0 0
1131752 0 0 0 0 0
1358102 0 0 0 0 0
1629722 0 0 0 0 0
1955666 0 0 0 0 0
2346799 0 0 0 0 0
2816159 0 0 0 0 0
3379391 0 0 0 0 0
4055269 0 0 0 0 0
4866323 0 0 0 0 0
5839588 0 0 0 0 0
7007506 0 0 0 0 0
8409007 0 0 0 0 0
10090808 0 0 0 0 0
12108970 0 0 0 0 0
14530764 0 0 0 0 0
17436917 0 0 0 0 0
20924300 0 0 0 0 0
25109160 0 0 0 0 0


  • There are 51 read requests spend time from 1109 microsecond to 1331 microsecond.

  • 997 sstables were read and spent time 1 microsecond.

  • Because this is a read operation, (cql select statement), there is no write latency involved.

  • The mean size for 999 partition is 149 bytes and another one is 179 bytes.

  • There are 1000 partition with 5 cells.


These metric is good for monitoring if you can poll periodically and plot them into graphs. Note that, those methods covered above, many had been deprecated in this cassandra version and probably in the coming cassandra, it will be removed and that they will have better way of depicting the metric. If you started on older cassandra version for example, pre-cassandra 1.1, the cell is correspond to column whilst partition is correspond to row.

Thank you.

Saturday, April 12, 2014

Investigate into cassandra 1.0.8 compaction

So what happened what you trigger compact via nodetool? In a nutshell, it goes into a series of low levels java calls.

The execution started on NodeCmd.java, NodeProbe.java, StorageServiceMBean.java, StorageService.java, ColumnFamilyStore.java, CompactionManager.java, AbstractCompactionTask.java and CompactionTask.java

Once object NodeProbe is establish, method forceTableCompaction (...) is called. Within NodeProbe, there is another called StorageServiceMBean which is the JMX bean interface implemented by class StorageService.

what forceTableCompaction(...) does is that, it iterate over the column families and start major compaction. Code snippet below:
public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
cfStore.forceMajorCompaction();
}
}

So it is pretty clear that, the execution goes by getting a valid column families and start to call its method forceMajorCompaction(). What actually happened is that, within method forceMajorCompaction(), this object (ColumnFamilyStore) is passed to CompactionManager singleton to perform an operation known as maximal.

Within CompactionManager class, the object cfStore is perform concurrently. It does by submit the cfStore object to a concurrent codes. To explain better, let's read general compaction framework below:
public Future<Object> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
// acquire the write lock long enough to schedule all sstables
compactionLock.writeLock().lock();
try
{
if (!cfStore.isValid())
return this;
AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
return this;
try
{
// downgrade the lock acquisition
compactionLock.readLock().lock();
compactionLock.writeLock().unlock();
try
{
return task.execute(executor);
}
finally
{
compactionLock.readLock().unlock();
}
}
finally
{
task.unmarkSSTables();
}
}
}
finally
{
// we probably already downgraded
if (compactionLock.writeLock().isHeldByCurrentThread())
compactionLock.writeLock().unlock();
}
return this;
}
};
return executor.submit(callable);
}

To summarize :

  • compaction write lock is made.

  • cfStore object is check again if it still valid.

  • the compaction strategy is retrieved from the cfStore object.

  • mark SSTables for compaction.

  • execute on the CompactionExecutor.


Currently there are two types of compaction strategy in this version; SizeTieredCompactionStrategy and LeveledCompactionStrategy and this discussion continue based on SizeTieredCompactionStrategy.

The real compaction work is done here.
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null;

Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isCompactionInteresting(toCompact))
return 0;

if (compactionFileLocation == null)
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
if (partialCompactionsAcceptable())
{
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
if (compactionFileLocation == null)
{
while (compactionFileLocation == null && toCompact.size() > 1)
{
logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
// Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all
// the sstables at the end.
toCompact.remove(cfs.getMaxSizeFile(toCompact));
compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}

if (compactionFileLocation == null)
{
logger.warn("insufficient space to compact even the two smallest files, aborting");
return 0;
}
}

if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);

// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);

CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
logger.info("Compacting {}", toCompact);

long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;

long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);

AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
: new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();

Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();

if (collector != null)
collector.beginCompaction(ci);
try
{
if (!nni.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
cfs.markCompacted(toCompact);
return 0;
}

SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
if (row.isEmpty())
continue;

long position = writer.append(row);
totalkeysWritten++;

if (DatabaseDescriptor.getPreheatKeyCache())
{
for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
}
}
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, Long>();
}
}
}
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
iter.close();
if (collector != null)
collector.finishCompaction(ci);
}

cfs.replaceCompactedSSTables(toCompact, sstables);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
SSTableReader key = ssTableReaderMapEntry.getKey();
for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
key.cacheKey(entry.getKey(), entry.getValue());
}

long dTime = System.currentTimeMillis() - startTime;
long startsize = SSTable.getTotalBytes(toCompact);
long endsize = SSTable.getTotalBytes(sstables);
double ratio = (double)endsize / (double)startsize;

StringBuilder builder = new StringBuilder();
builder.append("[");
for (SSTableReader reader : sstables)
builder.append(reader.getFilename()).append(",");
builder.append("]");

double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys at %fMB/s. Time: %,dms.",
builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
return toCompact.size();
}

That's a lot of works done in this method. :-) I summarized some important points below:

  • checking if enough sstables are present to compact.

  • check if the disk size is suffcient for this compaction task.

  • snapshot before compaction happen.

  • check sstable to be compact is belong to the same column family.

  • CompactionExecutorStatsCollector begin compaction with the AbstractCompactionIterable.

  •  create a compaction writer.

  •  replace a new compacted sstable with the old sstables.


I hope you enjoy this writing.