Showing posts with label cassandra1.0.8. Show all posts
Showing posts with label cassandra1.0.8. Show all posts

Sunday, July 3, 2016

apache cassandra 1.0.8 on READ_STAGE threads reference on sstables and so compaction cannot remove the sstables.

Back then when I was administer a apache cassandra 1.0.8 cluster, I noticed there were some (very little) sstables did not get remove even after compaction is done. The leftover sstables cause some administrative problem and I suspect could be due to maybe during reading of the sstables, this maybe not get remove.

 DataTracker.java  
   
   private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)  
   {  
     View currentView, newView;  
     do  
     {  
       currentView = view.get();  
       newView = currentView.replace(oldSSTables, replacements);  
     }  
     while (!view.compareAndSet(currentView, newView));  
   
     addNewSSTablesSize(replacements);  
     removeOldSSTablesSize(oldSSTables);  
   
     cfstore.updateCacheSizes();  
   }  

I supposed during replacement of the view and sstables, everything is atomic and hence during read, it will get from the new sstables. But I don't have enough high level knowledge on various subsystems work in cassandra. If you have an idea, please do leave your comment below.

This problem seem to go away after we upgraded the cluster to 1.1. I know by now (april 2016), cassandra 1.0, 1.1 or even 1.2 is ancient but if you are on 1.0 and pre1.0, you should really start to use cassandra 3.x or at least 2.x.

Sunday, March 29, 2015

My journey and experience on upgrading apache cassandra 1.0.8 to 1.0.12

Upon request of my blog reader, today I will share with you my experience on upgrading apache cassandra version 1.0.8 to 1.0.12 on a production live cluster. By sharing this information, I hope if you are also running and/or administer cassandra cluster, you can learn from my experience and ease your worry or pain.

First, let's lay out what's the current architecture in this environment.

  • java 6

  • 12 nodes cluster.

  • two spinning disk with raid 0, 32GB total system memory where 14GB allocated to the cassandra heap instance, with 800MB for young gen. quad core cpu.

  • pretty much stock cassandra.yaml configuration with the following different like concurrent_write to 64, flush_largest_memtables_at to 0.8, compaction_throughput_mb_per_sec to 64.

  • node load per node average at 500-550GB.


As you can see, this is pretty ancient cassandra we are using at of this time of writing but because cassandra has been rock solid serving read/write requests for years, it stays like this stable condition forever and we leverage on the benefit of scalling out like adding nodes from six to nine and eventually to twelve now. Realizing that the disk failure do happened in the nodes of the cluster, because of cassandra has a no single point of failure in mind, we can afford to loose a single node out of operation while replacing it. That were a few of the reasons we stayed with cassandra 1.0 for quite sometime.

Because we would like to probably goes to cassandra 2.0 and beyond, and java 6 has been EOL for quite sometime, it would be wise to upgrade java before cassandra. Because system are integrated like an ecosystem, it would be also wise to look at java used in the client system that read/write requests to the cassandra cluster. So make a checklist brainstorming what are clients that integrate into the cluster and then check out what are the current stable java 7 available. Example:

cassandra 1.0 cassandra-1.0.12 java miniumum 6 and above.
https://github.com/apache/cassandra/tree/cassandra-1.0.12

hector client using casandra 2.0.4 so java 7 minimum
https://github.com/hector-client/hector/blob/master/pom.xml

datastax cql driver use cassandra 2.1.2 so java 7 minimum
https://github.com/datastax/java-driver/blob/2.1/pom.xml

java 7 update release note
http://www.oracle.com/technetwork/java/javase/7u-relnotes-515228.html

features and enhancement
http://www.oracle.com/technetwork/java/javase/jdk7-relnotes-418459.html

java 7 in wiki http://en.wikipedia.org/wiki/Java_version_history#Java_SE_7_.28July_28.2C_2011.29

unicode
before upgrading, check if cassandra using different unicode on the data http://www.herongyang.com/Unicode/Java-Unicode-Version-Supported-in-Java-History.html
http://docs.oracle.com/javase/7/docs/technotes/guides/intl/enhancements.7.html
Early versions of the Java SE 7 release added support for Unicode 5.1.0. The final version of the Java SE 7 release supports Unicode 6.0.0. Unicode 6.0.0 is a major version of the Unicode Standard and adds support for over 2000 additional characters, as well as support for properties and data files.

As of the time of checking, we picked java 7 update 72. Upgrading java 6 to java 7 update 72 in the cassandra 1.0.8 is a painless process other than just time consuming. As load per node is huge and total number of nodes in cluster. I follow this steps for java upgrade in cassandra node.

upgrade java for all cassandra node
1. write a script to automatically install java7 on node, update java stacked size to 256k in cassandra-env.sh. set JAVA_HOME for file cassandra.in.sh to java 7.
2. execute the script in rolling fashion for all the node with one upgrade at a time.
3. stop cassandra
4. execute the script.
5. start the cassandra instance
6.0 start the cassandra instance and monitor after the node is up and then check the monitoring system after node elapsed for 30minutes, 60minutes, 1hours and 2hours.
6.1 check your client can read/write to that one upgraded node.

By now, you can perform the next node in the ring, but you can skip step 6.0 as you are sure that it is going to work. One thing I observed is that, the gc duration for cassandra using java 6 and java 7 is it is down by half! That's could means faster gc means more cpu cycle to process other tasks and less stop of the world for cassandra instance.

Leave this cluster with java 7 upgraded run a day or two and if it is okay, continue to cassandra upgrade. So which cassandra version to upgrade to? There are several guidelines I followed.

1. choose ONLY STABLE release for production cluster. How to choose? You should read this link.
2. read NEWS.txt  and Changes.txt . As time to time, change to the code base may affect example, the sstable. So pay attention especially between cassandra major upgrade.
3. read the code difference between the version you decided to upgrade too, example for this upgrade. https://github.com/apache/cassandra/compare/cassandra-1.0.8...cassandra-1.0.12
4. read the datastax upgrading node for minor version.

I spent a lot of time doing step 3 and by reading the code diference, realize what has been change and/or added and consider it will impact your cassandra environment. In order for further upgrade to cassandra 1.1, you will need to upgrade to the latest version of the one currently deployed. Example here. Once read the above checkpoints, you may have a lot of questions and TODOs and that will give further works. In the next step, it is best if you find out the questions and TODOs you have and then verify in the test cluster before apply to a production cluster.

For me, I have written a few bash scripts example mentioned above, java upgrade. Also I have written install test cluster for cassandra upgrade. Remember to also write script to snapshot the data directory using nodetool and then also write script to automatically downgrade. When something goes wrong, you can revert using the automatic downgrade script and using the backup from nodetool dump. Then you will need to save the configurations example, cassandra.in.sh, cassandra-env.sh, cassandra.yaml or any other in your environment cluster.

With these scripts written and tested, it is best if you get and acknowledgements from the management if this is to be proceed and also, it would be best if you have someone who is also administer of cassandra cluster with you just for the good and bad moments. ;-) You can also reach me by my follow button in the home page. :)

upgrade cassandra from 1.0.8 to 1.0.12

  1. stop repair and cleanup in all nodes in the cluster.

  2. write a script to automatically upgrade it and so you dont panic, waste time and composed during node upgrade. Trust me, save you a lot of time and human error free. scripts content could be the following:
    - download cassandra 1.0.12 and extract, file permission ,etc
    - backup current cassandra 1.0.8 using nodetool snapshots. make sure you write the snapshot directory name like MyKeyspace-1.0.8-date
    - drain the node.
    - stop cassandra if it is not yet stopped.
    - update cassandra 1.0.12 with your cluster settings.

  3. check the configuration changed and then start cassandra 1.0.12 new instance.

  4. monitor after the node is up and then check the monitoring system after node elapsed for 30minutes, 60minutes, 1hours and 2hours.

  5. check your client can read/write to that one upgraded node.


By now, you can perform the next node in the ring, but you can skip step 4.0 as you are sure that it is going to work. As the version of the cassandra sstable change in 1.0.10, from hc to hd, it is best all sstables in all nodes, using the hd version before perform the next major upgrade.

That's it for this article and whilst this maybe not cover all, may contain mistake, and/or if you want to comment, please leave your comment below.

Saturday, March 28, 2015

Investigate into apache cassandra corrupt sstable exception

Today, we will take a look at another apache cassandra 1.0.8 exception. Example of stack trace below.
ERROR [SSTableBatchOpen:2] 2015-03-07 06:11:58,544 SSTableReader.java (line 228) Corrupt sstable /var/lib/cassandra/data/MySuperKeyspace/MyColumnFamily-hc-6681=[Index.db, Statistics.db, CompressionInfo.db, Filter.db, Data.db]; skipped
java.io.IOException: Input/output error
at java.io.RandomAccessFile.readBytes0(Native Method)
at java.io.RandomAccessFile.readBytes(RandomAccessFile.java:350)
at java.io.RandomAccessFile.read(RandomAccessFile.java:385)
at org.apache.cassandra.io.util.RandomAccessReader.reBuffer(RandomAccessReader.java:128)
at org.apache.cassandra.io.util.RandomAccessReader.read(RandomAccessReader.java:302)
at java.io.RandomAccessFile.readFully(RandomAccessFile.java:444)
at java.io.RandomAccessFile.readFully(RandomAccessFile.java:424)
at org.apache.cassandra.io.util.RandomAccessReader.readBytes(RandomAccessReader.java:324)
at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:393)
at org.apache.cassandra.io.sstable.SSTableReader.load(SSTableReader.java:375)
at org.apache.cassandra.io.sstable.SSTableReader.open(SSTableReader.java:186)
at org.apache.cassandra.io.sstable.SSTableReader$1.run(SSTableReader.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Before we go into the code base for this stacktrace, I have no idea what is this about and this one shown when the cassandra 1.0.12 instance is booting up. Last I remember I trigger user defined compaction twice in cassandra 1.0.8 using the same sstables and after first compaction is done, then this sstable stay forever... like for two weeks plus. Then we have upgrade for the cassandra.

Enough said, let's go into the code base and understand what is really mean by corrupt sstable. Bottom of the the stack trace pretty obvious, ThreadPoolExecutor execute a future task run method.Then it is now on apache cassandra namespace codebase, as can be read below class SSTableReader, method batchOpen(), code snippet
    public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
final Set<DecoratedKey> savedKeys,
final DataTracker tracker,
final CFMetaData metadata,
final IPartitioner partitioner)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();

ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors());
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
{
Runnable runnable = new Runnable()
{
public void run()
{
SSTableReader sstable;
try
{
sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner);
}
catch (IOException ex)
{
logger.error("Corrupt sstable " + entry + "; skipped", ex);
return;
}
sstables.add(sstable);
}
};
executor.submit(runnable);
}

executor.shutdown();
try
{
executor.awaitTermination(7, TimeUnit.DAYS);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}

return sstables;

}

As can be read above, somewhere within the method open() throw the IOException, hence the above exception was thrown. Two stack trace up, we read that, sstable load method execute and, ByteBufferUtil.read() method. With the method read from class ByteBufferUtil as shown below.
    public static ByteBuffer read(DataInput in, int length) throws IOException
{
if (in instanceof FileDataInput)
return ((FileDataInput) in).readBytes(length);

byte[] buff = new byte[length];
in.readFully(buff);
return ByteBuffer.wrap(buff);
}

We see that, the input in a instance of FileDataInput stream and read the bytes with length. Since FileDataInput is a interface, we read that, the class that implement this interface is RandomAccessReader class and method readBytes as the follow.
public ByteBuffer readBytes(int length) throws IOException
{
assert length >= 0 : "buffer length should not be negative: " + length;

byte[] buff = new byte[length];
readFully(buff); // reading data buffer

return ByteBuffer.wrap(buff);
}

to read bytes with length is actually to read fully on the length but started on the current file pointer pointing at. And a little bit way up in the stack trace, method reBuffer()
    /**
* Read data from file starting from current currentOffset to populate buffer.
* @throws IOException on any I/O error.
*/
protected void reBuffer() throws IOException
{
resetBuffer();

if (bufferOffset >= channel.size())
return;

channel.position(bufferOffset); // setting channel position

int read = 0;

while (read < buffer.length)
{
int n = super.read(buffer, read, buffer.length - read);
if (n < 0)
break;
read += n;
}

validBufferBytes = read;

bytesSinceCacheFlush += read;

if (skipIOCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
{
// with random I/O we can't control what we are skipping so
// it will be more appropriate to just skip a whole file after
// we reach threshold
CLibrary.trySkipCache(this.fd, 0, 0);
bytesSinceCacheFlush = 0;
}
}

and this method call superclass to read another chunk into the buffer. The upper class RandomAccessFile , method readBytes()
    /**
* Reads a sub array as a sequence of bytes.
* @param b the buffer into which the data is read.
* @param off the start offset of the data.
* @param len the number of bytes to read.
* @exception IOException If an I/O error has occurred.
*/
private int readBytes(byte b[], int off, int len) throws IOException {
Object traceContext = IoTrace.fileReadBegin(path);
int bytesRead = 0;
try {
bytesRead = readBytes0(b, off, len);
} finally {
IoTrace.fileReadEnd(traceContext, bytesRead == -1 ? 0 : bytesRead);
}
return bytesRead;
}

private native int readBytes0(byte b[], int off, int len) throws IOException;

.. and we are at the end of this path, it turn out that the call to readBytes0 thrown exception, the lower layer native non java call throwing the IO exception. You can use nodetool scrub to see if this fix the problem but what I do basically wipe the data directory for the cassandra and rebuild it. Then I don't see anymore of this message anymore.

That's it for this article and if you want to improve and/or comment, please leave your input below.

Friday, March 27, 2015

Investigate into apache cassandra get_slice assertion error

Today, we will investigate another error from apache cassandra. Error as shown below in cassandra log.
ERROR [Thrift:2] 2015-02-11 11:06:10,837 Cassandra.java (line 3041) Internal error processing get_slice
java.lang.AssertionError
at org.apache.cassandra.locator.TokenMetadata.firstTokenIndex(TokenMetadata.java:518)
at org.apache.cassandra.locator.TokenMetadata.firstToken(TokenMetadata.java:532)
at org.apache.cassandra.locator.AbstractReplicationStrategy.getNaturalEndpoints(AbstractReplicationStrategy.java:94)
at org.apache.cassandra.service.StorageService.getLiveNaturalEndpoints(StorageService.java:1992)
at org.apache.cassandra.service.StorageService.getLiveNaturalEndpoints(StorageService.java:1986)
at org.apache.cassandra.service.StorageProxy.fetchRows(StorageProxy.java:604)
at org.apache.cassandra.service.StorageProxy.read(StorageProxy.java:564)
at org.apache.cassandra.thrift.CassandraServer.readColumnFamily(CassandraServer.java:128)
at org.apache.cassandra.thrift.CassandraServer.getSlice(CassandraServer.java:283)
at org.apache.cassandra.thrift.CassandraServer.multigetSliceInternal(CassandraServer.java:365)
at org.apache.cassandra.thrift.CassandraServer.get_slice(CassandraServer.java:326)
at org.apache.cassandra.thrift.Cassandra$Processor$get_slice.process(Cassandra.java:3033)
at org.apache.cassandra.thrift.Cassandra$Processor.process(Cassandra.java:2889)
at org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:187)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

So bottom first three lines pretty easy, a thread is ran with the thread pool executor. As indicated by the code snipet below, that a worker process having trouble in processing a request.
    try
{
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown. (but not sufficient,
// since process() can take arbitrarily long waiting for client input.
// See comments at the end of serve().)
while (!stopped_ && processor.process(inputProtocol, outputProtocol))
{
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
}
}

Skipping a few low level byte stream processing, we arrived at the actual class which actually implement the method get_slice. Read code snippet below.
    public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
logger.debug("get_slice");

state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
}

so we see another method is called, multigetSliceInternal. Read code snippet below where a few validations on the data.
    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, predicate);
ThriftValidation.validateConsistencyLevel(keyspace, consistency_level);

List<ReadCommand> commands = new ArrayList<ReadCommand>();
if (predicate.column_names != null)
{
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(metadata, key);
commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names));
}
}
else
{
SliceRange range = predicate.slice_range;
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(metadata, key);
commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count));
}
}

return getSlice(commands, consistency_level);
}

then method getSlice is called,  and method readColumnFamily() is also called. As shown below, the code snippet
  protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
// TODO - Support multiple column families per row, right now row only contains 1 column family
Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>();

if (consistency_level == ConsistencyLevel.ANY)
{
throw new InvalidRequestException("Consistency level any may not be applied to read operations");
}

List<Row> rows;
try
{
schedule(DatabaseDescriptor.getRpcTimeout());
try
{
rows = StorageProxy.read(commands, consistency_level);
}
finally
{
release();
}
}
catch (TimeoutException e)
{
logger.debug("... timed out");
throw new TimedOutException();
}
catch (IOException e)
{
throw new RuntimeException(e);
}

for (Row row: rows)
{
columnFamilyKeyMap.put(row.key, row.cf);
}
return columnFamilyKeyMap;
}

another class is called, StorageProxy to read the row in concern and the read method code snippet below.
    /**
* Performs the actual reading of a row out of the StorageService, fetching
* a specific set of column names from a given column family.
*/
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level)
throws IOException, UnavailableException, TimeoutException, InvalidRequestException
{
if (StorageService.instance.isBootstrapMode())
throw new UnavailableException();
long startTime = System.nanoTime();
List<Row> rows;
try
{
rows = fetchRows(commands, consistency_level);
}
finally
{
readStats.addNano(System.nanoTime() - startTime);
}
return rows;
}

the exception lead this investigation to fetching the row and within the same class, for method fetchRows, code snippet below.
    /**
* This function executes local and remote reads, and blocks for the results:
*
* 1. Get the replica locations, sorted by response time according to the snitch
* 2. Send a data request to the closest replica, and digest requests to either
* a) all the replicas, if read repair is enabled
* b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel
* 3. Wait for a response from R replicas
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<Row> rows = new ArrayList<Row>(initialCommands.size());
List<ReadCommand> commandsToRetry = Collections.emptyList();

do
{
List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
ReadCallback<Row>[] readCallbacks = new ReadCallback[commands.size()];

if (!commandsToRetry.isEmpty())
logger.debug("Retrying {} commands", commandsToRetry.size());

// send out read requests
for (int i = 0; i < commands.size(); i++)
{
ReadCommand command = commands.get(i);
assert !command.isDigestQuery();
logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);

List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);

RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
handler.assureSufficientLiveNodes();
assert !handler.endpoints.isEmpty();
readCallbacks[i] = handler;

// The data-request message is sent to dataPoint, the node that will actually get the data for us
InetAddress dataPoint = handler.endpoints.get(0);
if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
logger.debug("reading data from {}", dataPoint);
MessagingService.instance().sendRR(command, dataPoint, handler);
}

if (handler.endpoints.size() == 1)
continue;

// send the other endpoints a digest request
ReadCommand digestCommand = command.copy();
digestCommand.setDigestQuery(true);
MessageProducer producer = null;
for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
{
logger.debug("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
logger.debug("reading digest from {}", digestPoint);
// (We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.)
if (producer == null)
producer = new CachingMessageProducer(digestCommand);
MessagingService.instance().sendRR(producer, digestPoint, handler);
}
}
}

// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
List<RepairCallback> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
ReadCallback<Row> handler = readCallbacks[i];
ReadCommand command = commands.get(i);
try
{
long startTime2 = System.currentTimeMillis();
Row row = handler.get();
if (row != null)
{
command.maybeTrim(row);
rows.add(row);
}

if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
}
catch (TimeoutException ex)
{
if (logger.isDebugEnabled())
logger.debug("Read timeout: {}", ex.toString());
throw ex;
}
catch (DigestMismatchException ex)
{
if (logger.isDebugEnabled())
logger.debug("Digest mismatch: {}", ex.toString());
RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);

if (repairCommands == null)
{
repairCommands = new ArrayList<ReadCommand>();
repairResponseHandlers = new ArrayList<RepairCallback>();
}
repairCommands.add(command);
repairResponseHandlers.add(repairHandler);

MessageProducer producer = new CachingMessageProducer(command);
for (InetAddress endpoint : handler.endpoints)
MessagingService.instance().sendRR(producer, endpoint, repairHandler);
}
}

if (commandsToRetry != Collections.EMPTY_LIST)
commandsToRetry.clear();

// read the results for the digest mismatch retries
if (repairResponseHandlers != null)
{
for (int i = 0; i < repairCommands.size(); i++)
{
ReadCommand command = repairCommands.get(i);
RepairCallback handler = repairResponseHandlers.get(i);
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
// behind on writes in case the out-of-sync row is read multiple times in quick succession
FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());

Row row;
try
{
row = handler.get();
}
catch (DigestMismatchException e)
{
throw new AssertionError(e); // full data requested from each node here, no digests should be sent
}

ReadCommand retryCommand = command.maybeGenerateRetryCommand(handler, row);
if (retryCommand != null)
{
logger.debug("issuing retry for read command");
if (commandsToRetry == Collections.EMPTY_LIST)
commandsToRetry = new ArrayList<ReadCommand>();
commandsToRetry.add(retryCommand);
continue;
}

if (row != null)
{
command.maybeTrim(row);
rows.add(row);
}
}
}
} while (!commandsToRetry.isEmpty());

return rows;
}

As this point of investigation, this method, fetchRows documentation is pretty useful for us.
* This function executes local and remote reads, and blocks for the results:
*
* 1. Get the replica locations, sorted by response time according to the snitch
* 2. Send a data request to the closest replica, and digest requests to either
* a) all the replicas, if read repair is enabled
* b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel

we see this method actually execute on local and remote node, and during getting the node who is responsible to keep the row, problem occur. Let's read on the method getLiveNaturalEndpoints() and as shown below.
    /**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer key)
{
return getLiveNaturalEndpoints(table, partitioner.getToken(key));
}

public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
{
List<InetAddress> liveEps = new ArrayList<InetAddress>();
List<InetAddress> endpoints = Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);

for (InetAddress endpoint : endpoints)
{
if (FailureDetector.instance.isAlive(endpoint))
liveEps.add(endpoint);
}

return liveEps;
}

a little upper in the stack trace, abstract class AbstractReplicationStrategy
    /**
* get the (possibly cached) endpoints that should store the given Token
* Note that while the endpoints are conceptually a Set (no duplicates will be included),
* we return a List to avoid an extra allocation when sorting by proximity later
* @param searchToken the token the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken)
{
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null)
{
TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
cacheEndpoint(keyToken, endpoints);
}

return new ArrayList<InetAddress>(endpoints);
}

somehow the ring size is equal to 0 or less than 0. class TokenMetadata.java and code snippet where the assertion thrown,
    public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
// insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring
int i = Collections.binarySearch(ring, start);
if (i < 0)
{
i = (i + 1) * (-1);
if (i >= ring.size())
i = insertMin ? -1 : 0;
}
return i;
}

public static Token firstToken(final ArrayList<Token> ring, Token start)
{
return ring.get(firstTokenIndex(ring, start, false));
}

So something went during during reading a row's column and somehow the natural endpoint is either 0 or empty. My guess is that, it could be gossip is disable so the ring metadata is empty. The solution is to enable the gossip and then restart cassandra instance.

If you think this analysis is not accurate or want to provide more information, please do so by commenting below. Thank you.

Sunday, January 18, 2015

how to improve apache cassandra 1.0.8 read speed

This article is for improve reading speed for apache cassandra 1.0.8. Because the reading improvement determined by many factors, we will investigate all possible areas so the gain will be improve collectively. So you may experience these factors and alter according to suit your node environment to achieve the best result. As the cassandra 1.0 released, the official cited that the read performance has increased up to 400%!

First and foremost, there are numerous articles which I use as a reference has cited copyright, I take no ownership nor credit of their hardwork as that is rightfully belong to them entirely. I only reference their work to improve my knowledge and to help people (like me) who need help and came to read what I share in my article.

Let's split these improvements into two parts, the hardware and the software.

hardware

ssd

ssd disk is way faster than hdd disk in term of reading in multiple magnitude, please read cassandra-benchmark for the benchmark. Although the cassandra using was version 0.8.10, but when cassandra 1.0 was released, read gained tremendous improvement. Then these two improvemetns will be linear gain too. Also, it is recommend to read the aforementioned article as it explain why is the random speed will hurt the read performance for hdd disk.

multiple disks

disks allocation to the commit log should be different than the data directory. Because during data write, data is repeating appending to the commit log. If the data directory is located on different drive, read performance gain should be visible.

 

software

memtable
If write behaviour has a lot of updates, it is good to look into memtable settings. There are two settings which you can start with

  • memtable_total_space_in_mb

  • commitlog_total_space_in_mb


more memory to this settings means the frequent write (update) will be absorded by the memory and thus, reading will be fast too as read start from memtable first before going into sstable. But because this impact system wide, you might want to gradually start to increase it and measure them. Read below for more information on what these two settings are and how to tune them
http://www.datastax.com/docs/1.0/operations/tuning#memtable-sizing
http://www.datastax.com/docs/1.0/configuration/node_configuration#memtable-total-space-in-mb
http://www.slideshare.net/driftx/cassandra-summit-2010-performance-tuning slide 14 and slide 26
http://wiki.apache.org/cassandra/MemtableThresholds

WP-DataStax-Cassandra page 16
Specifically, for read performance, Cassandra 1.0 optimizes queries by using a lighter-weight data structure for representing a row fragment from a read, than for a row fragment in a memtable into which updates accumulates. Also, with named reads, Cassandra 1.0 includes enhancements for deserializing the most recent versions of requested columns. Combined with the other optimizations, this makes reads in Cassandra as fast as writes for many workloads.

data compression

Previously I have done a study on the compression affect improvement read, read here, herehere and here. Please read the links as it provide comprehensive explanation than I could describe here.

compaction

compaction can improve (or impact) the read speed. Citation fromWP-DataStax-Cassandra,
The above process produces exceptionally fast write operations; however it also can lead to data fragmentation across the disk. Read requests may have to combine data from many SStables as well as Memtables to satisfy end user requests for data, and this can increase query response times.

To reduce data fragmentation and reclaim space taken by obsolete data, Cassandra performs "compactions" that merge the most recent data from many different SStables on disk into a new one.

So with my experience, if you trigger compaction (major) through nodetool, during compaction, the read latency will increase, thus impact but when the compaction is done, the read performance is improved.

In this documentation,  it explain different compaction strategy to use for read or write workload. So identify how's your environment write and read pattern and always measure it so you know what and when it could went wrong. Choose compaction strategy to suit your data model. For instance, if cassandra is not strong at a point, choose other big data technology. Read here for bad experience encountered.

sstables counts

Keep the sstables counts as low as possible for a column family. Excerpt from FULLTEXT02 page 39.
If a read operation is performed, initially the data are read from the memtable. If data are not in the memtable, then data get read from SSTable. Multiple SSTables may be looked up to find the data. Reading directly from SSTable decreases the performance because there are many SSTable that might need to be looked at hence requires an I/O operation means it requires touching the disk. Compared to SSTable, reading directly from memtable is fast because there is no I/O involved. The more the I/O operations are involved, the more performance will be degraded. Performance can also be increased by increasing the size of memtable [7].

Cassandra uses Bloom filter to judge quickly whether the key exists in the SSTable or not before touching the disk. Bloom filter is a efficient data structure that checks whether element is a member of a set by dividing the memory into buckets. Check each bucket to see if a key is present and if any bucket is empty then key was never inserted before. If there are many SSTables, then lots of I/O operations would be needed to read the data which can definitely decrease the performance. This is because of the fact that I/O operations are expensive and therefore compaction is used to improve read performance. Compaction merge two SSTables and sort to become one SSTable, which eventually decreases the number of SSTables and number of I/O operations, hence increasing the performance [7].

key / row cache

key cache should be enable to reduce the search from touching the disk, especially spinning disk. Excerpt from FULLTEXT02 page 47.
By default key cache is enabled and Cassandra caches 20,000 keys per Column Family (CF). The key cache decreases the Input/Output (I/O) operations because if key cache is not enabled then I/O operation is required in order to figure out the exact location of the row. Key cache holds the exact location of the data belonging to that key.

 

Row cache holds the entire content of the row in cache. By default, row cache is disabled. The overhead of enabling or increasing the row cache is that it may require more Java Virtual Machine (JVM) heap of Cassandra. By if jna lib is available, then storing row cache off heap is a good option. This article has diagram on how read is perform.

concurrent_read

Excerpt from FULLTEXT02 page 48.
Read performance can also be increased by tuning the concurrent reads. The rule is span 4 threads per Central Processing Units (CPU) core in the cluster. The higher the number of threads spanned for read, the higher performance can be achieved if the machines have got faster I/O.

A word of cautious, I tried increase concurrent_read from 32 to 64 and see some unpredictable behaviour, so it is better you do this in test environment.

decreasing read consistency level

If your business requirement can tolerate of eventual consistency, then decrease from quorum to one will improve read speed as only one node acknowledgement is sufficient to fulfill the read request compare to a certain amount of nodes in quorum.

turn off swap space

When the node start to swap due to shortage of memory, the response of node be it write or read will be visible. Hence it is best to turn off swap, and let the operating system kill or jvm kill itself to oom than the page swap start to happen.

java heap

Citation from OS-8.1.3-Cassandra Installation and Configuration Guide page 33
HEAP_NEWSIZE : Size of young generation. Larger value leads to longer GC pause times while smaller value will typically lead to more expensive GC. Set in conjunction with MAX_HEAP_SIZE.

So tune it carefully since this is pretty low level. Read this article as it mentioned a few garbage collector settings for cassandra and memory footprint.

upgrade

Each release of software improve or fix the previous defect, so is cassandra. If upgrade is viable, you should consider. For instance, to quote Aaron Morton
1.0 has key and row caches defined per CF, 1.1 has global ones which are better utilised and easier to manage. 1.2 moves bloom filters and compression meta off heap which reduces GC, which will help.  Things normally get faster.

This is also true.

monitoring

Because data load increase and/or decrease will impact the read response time, it is vital if there is monitoring services running. As cited from this paper, p1724_tilmannrabl_vldb2012 Page 1,
In modern enterprise systems it is not uncommon to have thousands of different metrics that are reported from a single host machine.

So monitor crucial metrics by cassandra, example, cpu, java heap and io should give some indicator if your speed has been reduced.

Whilst these are collected knowledge are from public and free will sharing. Any mistake and errors in this article is mine alone and does not reflect to them. Thank you and I hope you learned something.

Saturday, January 17, 2015

Investigate into Apache Cassandra Memtable updateLiveRatio logging output

Today, we are going to study apache cassandra 1.0.8 on memtable logging its statistics in the cassandra system.log. Example below
WARN [MemoryMeter:1] 2014-10-17 07:38:15,346 Memtable.java (line 176) setting live ratio to minimum of 1.0 instead of 0.16714977001091136
INFO [MemoryMeter:1] 2014-10-17 07:38:15,346 Memtable.java (line 186) CFS(Keyspace='OpsCenter', ColumnFamily='pdps') liveRatio is 1.6474114033116056 (just-counted was 1.0). calculation took 3ms for 595 columns

What does the above logging output means? Is cassandra instance operating normally since logging level is warn? Before we dive into the code to understand its meaning, let first read and understand what is memtable in cassandra context.

Excerpts from datastax documentation

Cassandra is optimized for write throughput. Cassandra writes are first written to a commit log (for durability), and then to an in-memory table structure called a memtable. A write is successful once it is written to the commit log and memory, so there is very minimal disk I/O at the time of write. Writes are batched in memory and periodically written to disk to a persistent table structure called an SSTable (sorted string table). Memtables and SSTables are maintained per column family. Memtables are organized in sorted order by row key and flushed to SSTables sequentially (no random seeking as in relational databases).

Whenever method updateRatio() from class Memtable is called, the following codes applies.
public void updateLiveRatio()
{
if (!MemoryMeter.isInitialized())
{
// hack for openjdk. we log a warning about this in the startup script too.
logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0. Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
cfs.liveRatio = 10.0;
return;
}

Runnable runnable = new Runnable()
{
public void run()
{
activelyMeasuring = Memtable.this;

long start = System.currentTimeMillis();
// ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
// So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
long deepSize = meter.measure(columnFamilies);
int objects = 0;
for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
{
deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
objects += entry.getValue().getColumnCount();
}
double newRatio = (double) deepSize / currentThroughput.get();

if (newRatio < MIN_SANE_LIVE_RATIO)
{
logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
newRatio = MIN_SANE_LIVE_RATIO;
}
if (newRatio > MAX_SANE_LIVE_RATIO)
{
logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio);
newRatio = MAX_SANE_LIVE_RATIO;
}
cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);

logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
activelyMeasuring = null;
}
};

try
{
meterExecutor.submit(runnable);
}
catch (RejectedExecutionException e)
{
logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
}
}

There is a new thread which will be execute by the executor meterExecutor. This executor if it is not busy, will start to measure this column family associated with this memtable. There are two essential metrics that is involve in the arithmetic, key size plus value size and the column count. deepSize is the summation of the column family , its key and its value. A new variable newRatio is calculated with the deepSize divided by the currentThroughput.

Valid newRatio range is between 1.0 to 64.0 inclusive. When calculated newRatio is less than 1.0, first line of log such as above will started to appear in cassandra system log and newRatio will be reset to 1.0. The same check when newRatio exceed 64.0, it will be logged and value for newRatio reset to maximum valid value of 64.0. Then column family live ratio is updated with whichever which is higher, the current or the new calculated newRatio.

So technically, this is nothing to really concern about. It's a measurement of the number of operations has increased as compare to the previous. Also, given the next throughput, as estimated of size of the memtable can be calculated based on the newRatio. As a side note, this method updateRatio() is called after a mutation has been applied to the memtable but before a flush is requested.

That's it for today, I hope you learned something.

Friday, January 16, 2015

operate casandra using jmx in terminal including changing pool size, compacting sstables and key cache

If you operate apache cassandra cluster and if load per node goes huge (like nodetool info show 800GB), compactions become a problem. It's a big problem for apache cassandra 1.0.8 if you have load per node average hover around 600GB to 1TB. The read performance suffers and at times system uptime load goes high. In some instance, I noticed when repair is running, system load goes more than 20. It's not a concern if this is operating well, but the more often you see this, something has gone wrong. Today, I will share my experience on how to operate cassandra when node load is huge and cassandra instance is still running. Often times, there are nice method that is exposed via jmx but to operate remotely, jmx gui client such as jmxconsole is not ideal. Instead, we will using a jmxterm for these operation in apache cassandra 1.0.8. So let's get started.

Changing pool size

So, it is pretty simple, launch it and set to the bean, and then set the CorePoolSize. The steps will be illustrate below.
$ java -jar jmxterm-1.0-alpha-4-uber.jar
$>open localhost:7199
#Connection to localhost:7199 is opened
$>bean org.apache.cassandra.request:type=ReplicateOnWriteStage
#bean is set to org.apache.cassandra.request:type=ReplicateOnWriteStage
$>get CorePoolSize
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage:
CorePoolSize = 32;
$>info
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage
#class name = org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor
# attributes
%0 - ActiveCount (int, r)
%1 - CompletedTasks (long, r)
%2 - CorePoolSize (int, rw)
%3 - CurrentlyBlockedTasks (int, r)
%4 - PendingTasks (long, r)
%5 - TotalBlockedTasks (int, r)
#there's no operations
#there's no notifications
$>set CorePoolSize 64
$>get CorePoolSize
#mbean = org.apache.cassandra.request:type=ReplicateOnWriteStage:
CorePoolSize = 64;

Alter key cache

Often times, when there is heap pressure in the jvm, the safety valve kicks in.  You can restart the cassandra instance or you can reset the key cache back to the initial value. Assuming your column family name FooBar and keyspace just4fun, then the following are steps to illustrate how is this done.
$>bean org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
#bean is set to org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
$>info
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches
#class name = org.apache.cassandra.cache.AutoSavingKeyCache
# attributes
%0 - Capacity (int, rw)
%1 - Hits (long, r)
%2 - RecentHitRate (double, r)
%3 - Requests (long, r)
%4 - Size (int, r)
#there's no operations
#there's no notifications
$>
$>get Size
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
Size = 122307;

$>set Capacity 250000
#Value of attribute Capacity is set to 250000
$>get Capacity;
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
$>get Capacity
#mbean = org.apache.cassandra.db:cache=FooBarKeyCache,keyspace=just4fun,type=Caches:
Capacity = 250000;

Compact sstable

Lastly, to compact sstables. It's amazing we have a sstable that as huge as 84GB! So trigger major compaction is not an option here, often time when load per node goes beyond 600GB, compaction took forever, as GC kick in and cpu keep on recollecting heap, making system load goes high. So here, we will select one sstable that is huge and compact that only. You can also select a few sstable and compact them and separate using comma.
$>bean org.apache.cassandra.db:type=CompactionManager
#bean is set to org.apache.cassandra.db:type=CompactionManager
$>run forceUserDefinedCompaction just4fun FooBar-hc-5-Index.db
#calling operation forceUserDefinedCompaction of mbean org.apache.cassandra.db:type=CompactionManager
#RuntimeMBeanException: java.lang.IllegalArgumentException: FooBar-hc-5-Index.db does not appear to be a data file
$>run forceUserDefinedCompaction just4fun FooBar-hc-401-Data.db
#calling operation forceUserDefinedCompaction of mbean org.apache.cassandra.db:type=CompactionManager
#operation returns:
null

The compaction should be started, you can check in cassandra system log or the nodetool compaction. So that's it, I hope you learned something.

Saturday, January 3, 2015

apache cassandra 1.0.8 IncompatibleClassChangeError vtable stub and AssertionError Added column does not sort as the last column

Today we will spend sometime to look into two errors and see if it is really something to concern about. The erros are thrown when apache cassandra version 1.0.8 is running. Okay, let's to the first error.
ERROR [ReadStage:1559] 2012-10-16 20:38:25,336 AbstractCassandraDaemon.java (line 139) Fatal exception in thread Thread[ReadStage:1559,5,main]
java.lang.IncompatibleClassChangeError: vtable stub
at org.apache.cassandra.db.AbstractColumnContainer.getColumn(AbstractColumnContainer.java:134)
at org.apache.cassandra.db.Memtable$6.computeNext(Memtable.java:402)
at org.apache.cassandra.db.Memtable$6.computeNext(Memtable.java:384)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:140)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:135)
at org.apache.cassandra.db.CollationController.collectTimeOrderedData(CollationController.java:93)
at org.apache.cassandra.db.CollationController.getTopLevelColumns(CollationController.java:62)
at org.apache.cassandra.db.ColumnFamilyStore.getTopLevelColumns(ColumnFamilyStore.java:1298)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1184)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1151)
at org.apache.cassandra.db.Table.getRow(Table.java:375)
at org.apache.cassandra.db.SliceByNamesReadCommand.getRow(SliceByNamesReadCommand.java:58)
at org.apache.cassandra.service.StorageProxy$LocalReadRunnable.runMayThrow(StorageProxy.java:765)
at org.apache.cassandra.service.StorageProxy$DroppableRunnable.run(StorageProxy.java:1224)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

I guess this is really some fatal errors encountered. Let's check out what is IncompatibleClassChangeError means. From IncompatibleClassChangeError javadoc

Thrown when an incompatible class change has occurred to some class definition. The definition of some class, on which the currently executing method depends, has since changed.

Honestly, this is really odd, this error thrown out of no where and the jvm stopped. I guess nobody change the compiled apache cassandra code during the node instance is running. It certainly sounds odd but I guess when the data hold per node is huge, then strange thing start to happened. Now, let's get into stack trace and follow stack trace path.

Bottom three stack traces are pretty obvious, a new thread was started and execute by the thread pool executor. Then we have class StorageProxy. Within this class, there is a static class LocalReadRunnable which implement the abstract method LocalReadRunnable. It seem like it is trying to read a local node table row. The table eventually make calls to the column family to retrieve column. Tracing even deeper, at line 134 of class AbstractColumnContainer,  reveal that no exception is thrown from here. This is like a mystery! :) The stack trace analysis and observed jvm stopped shown something is wrong. Though I am not sure what went wrong but if you have any idea, please discuss it as a comment below.

Next we look to another error.
ERROR [CompactionExecutor:4] 2014-10-22 06:13:00,884 AbstractCassandraDaemon.java (line 139) Fatal exception in thread Thread[CompactionExecutor:4,1,main]
java.lang.AssertionError: Added column does not sort as the last column
at org.apache.cassandra.db.ArrayBackedSortedColumns.addColumn(ArrayBackedSortedColumns.java:126)
at org.apache.cassandra.db.AbstractColumnContainer.addColumn(AbstractColumnContainer.java:129)
at org.apache.cassandra.db.AbstractColumnContainer.addColumn(AbstractColumnContainer.java:124)
at org.apache.cassandra.db.ColumnFamilySerializer.deserializeColumns(ColumnFamilySerializer.java:148)
at org.apache.cassandra.io.sstable.SSTableIdentityIterator.getColumnFamilyWithColumns(SSTableIdentityIterator.java:232)
at org.apache.cassandra.db.compaction.PrecompactedRow.merge(PrecompactedRow.java:110)
at org.apache.cassandra.db.compaction.PrecompactedRow.<init>(PrecompactedRow.java:97)
at org.apache.cassandra.db.compaction.CompactionController.getCompactedRow(CompactionController.java:137)
at org.apache.cassandra.db.compaction.CompactionIterable$Reducer.getReduced(CompactionIterable.java:102)
at org.apache.cassandra.db.compaction.CompactionIterable$Reducer.getReduced(CompactionIterable.java:87)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.consume(MergeIterator.java:118)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:101)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:140)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:135)
at com.google.common.collect.Iterators$7.computeNext(Iterators.java:614)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:140)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:135)
at org.apache.cassandra.db.compaction.CompactionTask.execute(CompactionTask.java:173)
at org.apache.cassandra.db.compaction.CompactionManager$1.call(CompactionManager.java:135)
at org.apache.cassandra.db.compaction.CompactionManager$1.call(CompactionManager.java:115)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Again, this error pop out of nowhere, during node operation, this exception just thrown. So now, we will look into what happened using the stack trace given. Similar to the above, bottom three stack traces are pretty obvious, a new thread was started and execute by the thread pool executor. It is a compaction thread and when compacting sstable, things break. Then row get reduced and columns get deserialized. When columns are re-formation, it throw an exception as the column does not sort. This is another strange phenomenon. It should have been sort before anyway but it is not. The javdoc for this method addColumn

AddColumn throws an exception if the column added does not sort after the last column in the map. The reasoning is that this implementation can get slower if too much insertions are done in unsorted order and right now we only use it when *all* insertion (with this method) are done in sorted order. The assertion throwing is thus a protection against performance regression without knowing about (we can revisit that decision later if we have use cases where most insert are in sorted order but a few are not).

It seem that this is probably reproducible during development environment as a few are not.. I guess this is not as fatal as previous error. That's it for this analysis, please give your inputs or comments if you have workaround.




 

UPDATE:

It seem that the second error is fix in https://issues.apache.org/jira/browse/CASSANDRA-5856

Sunday, December 21, 2014

apache cassandra 1.0.8 out of memory error unable to create new native thread

If you are using apache cassandra 1.0.8 and having the exception such as below, you may want to further read. Today, we will investigate on what this error means and what can we do to correct this situation.
ERROR [Thread-273] 2012-14-10 16:33:18,328 AbstractCassandraDaemon.java (line 139) Fatal exception in thread Thread[Thread-273,5,main]
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:640)
at java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(ThreadPoolExecutor.java:727)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:657)
at org.apache.cassandra.thrift.CustomTThreadPoolServer.serve(CustomTThreadPoolServer.java:104)
at org.apache.cassandra.thrift.CassandraDaemon$ThriftServer.run(CassandraDaemon.java:214)

This is not good, the application crashed with this error during operation. To illustrate this environment, it is running using oracle java 6 with apache cassandra 1.0.8. It has 12GB of java heap assigned with stack size 128k, max user processes 260000 and open files capped at 65536.

Investigate into the java stack trace, reveal that, this error is not thrown by java code but native code. Below is the trace path.

  1. https://github.com/apache/cassandra/blob/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java#L214

  2. https://github.com/apache/cassandra/blob/cassandra-1.0.8/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java#L104

  3. ThreadPoolExecutor.java line 657
    cassandra_investigation_1

  4. ThreadPoolExecutor.java line 727
    cassandra_investigation_2

  5. Thread.java line 640
    cassandra_investigation_3


A little explanation before we delve even deeper. Number 3 to 5, is jdk dependent. Hence, if you are using openjdk, the line number may be different. As mentioned early, I'm using oracle jdk. Unfortunately, it is not available online for browsing but you can download the source from oracle site.

Because this is a native call, we will look into code that is not in Java. If the following code looks alien to you, it sure looks alien to me as it is probably written in c++. If you have also notice, this code is taken from openjdk and it is not found in the oracle jdk. Probably it is a closed source but we will not go there. Let's just focus where this error thrown from and why. It is taken from here and the explanation here.
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;

// We cannot hold the Threads_lock when we throw an exception,
// due to rank ordering issues. Example: we might need to grab the
// Heap_lock while we construct the exception.
bool throw_illegal_thread_state = false;

// We must release the Threads_lock before we can post a jvmti event
// in Thread::start.
{
// Ensure that the C++ Thread and OSThread structures aren't freed before
// we operate.
MutexLocker mu(Threads_lock);

// Since JDK 5 the java.lang.Thread threadStatus is used to prevent
// re-starting an already started thread, so we should usually find
// that the JavaThread is null. However for a JNI attached thread
// there is a small window between the Thread object being created
// (with its JavaThread set) and the update to its threadStatus, so we
// have to check for this
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
// We could also check the stillborn flag to see if this thread was already stopped, but
// for historical reasons we let the thread detect that itself when it starts running

jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
// Allocate the C++ Thread structure and create the native thread. The
// stack size retrieved from java is signed, but the constructor takes
// size_t (an unsigned type), so avoid passing negative values which would
// result in really large stacks.
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);

// At this point it may be possible that no osthread was created for the
// JavaThread due to lack of memory. Check for this situation and throw
// an exception if necessary. Eventually we may want to change this so
// that we only grab the lock if the thread was created successfully -
// then we can also do this check and throw the exception in the
// JavaThread constructor.
if (native_thread->osthread() != NULL) {
// Note: the current thread is not being used within "prepare".
native_thread->prepare(jthread);
}
}
}

if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}

assert(native_thread != NULL, "Starting null thread?");

if (native_thread->osthread() == NULL) {
// No one should hold a reference to the 'native_thread'.
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}

Thread::start(native_thread);

JVM_END

As I don't have knowledge in cpp, hence, there is no analysis into this snippet above, but if you understand what it does, I will be happy if you can give your analysis as a comment below of this article. It certainly looks to me that the operating system cannot create a thread at this point due to a few errors, JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR and / or JVMTI_RESOURCE_EXHAUSTED_THREADS. Let's google to find out what is that supposed to mean. Below are some which is interesting.

To summarize the analysis from the links above.

  • stack is created when thread is created and when more threads are created, hence the total of stacks also increased as a result.

  • A Java Virtual Machine stack stores frames. A Java Virtual Machine stack is analogous to the stack of a conventional language such as C: it holds local variables and partial results, and plays a part in method invocation and return.

  • Java stack is not within of java heap, hence, even if you increase java heap to the cassandra via parameter -Xms or -Xmx, this error will happen again if the condition is met again in the future.

  • If Java Virtual Machine stacks can be dynamically expanded, and expansion is attempted but insufficient memory can be made available to effect the expansion, or if insufficient memory can be made available to create the initial Java Virtual Machine stack for a new thread, the Java Virtual Machine throws an OutOfMemoryError.


Until current analysis, it certainly looks to me that when cassandra instance trying to create a new thread, it was not able to. It was not able to because the underlying operating system cannot create the thread due to two errors. It actually looks like the operating system does not have sufficient memory to create the thread, hence increasing -Xms or -Xmx will not solve the problem. Note that the file descriptor set in this case is not met neither as most of the criterias pretty much infinite.

It's pretty interesting to note that, if such error is thrown, to solve the problem is to decrease the -Xss or even the heap -Xms and -Xmx. Although I don't understand the logic behind of such method used, perhaps you should try but I seriously doubt so. If cassandra node has high usage of heap, decreasing heap will only create another type of problem.

If you know or have encountered such problem before and has a good fix, please leave the comment below this article. To end this article, there is currently as of this writing, a discussion happen at cassandra mailing list.

Sunday, November 23, 2014

Investigate into why key cache in apache cassandra 1.0.8 gets reduced

Today, we will investigate into apache cassandra 1.0.8 when and why it reduce configured key cache. If you run the command nodetool cfstats. One of the statistics would probably interest you. I paste the snippet below.
Key cache capacity: 200000
Key cache size: 200000
Key cache hit rate: 0.9655797101449275
Row cache: disabled

After cassandra instance has been running for sometime, and you start to notice that the key cache capacity has gone down.
Key cache capacity: 150000
Key cache size: 150000
Key cache hit rate: 0.962251615630851
Row cache: disabled

As seen above, the initial capacity for this column family has 20,000 total key for cache. Currently, all object (that is 20,000) occupied fully in the key cache assigned. The hit rate is 96% which is very good statistics. So after a while, what had happened and why was it reduce? Let's investigate into the log file.
 WARN [ScheduledTasks:1] 2014-02-02 00:46:46,384 AutoSavingCache.java (line 187) Reducing MyColumnFamily KeyCache capacity from 200000 to 150000 to reduce memory pressure

Apparently memory is not enough at this point of time and the key cache is reduced to free up more memory for the cassandra instance. Let's look at the cassandra yaml file if there is any description for the key cache.
# emergency pressure valve #2: the first time heap usage after a full
# (CMS) garbage collection is above this fraction of the max,
# Cassandra will reduce cache maximum _capacity_ to the given fraction
# of the current _size_. Should usually be set substantially above
# flush_largest_memtables_at, since that will have less long-term
# impact on the system.
#
# Set to 1.0 to disable. Setting this lower than
# CMSInitiatingOccupancyFraction is not likely to be useful.
reduce_cache_sizes_at: 0.85
reduce_cache_capacity_to: 0.6

There are two configurations that reduce the cache size. When memory heap usage at 85%, key cache is reduced to 60% of its initial value. So now we dive deeper into the code to see what happened. Let's read into class GCInspector.
double usage = (double) memoryUsed / memoryMax;

if (memoryUsed > DatabaseDescriptor.getReduceCacheSizesAt() * memoryMax && !cacheSizesReduced)
{
cacheSizesReduced = true;
logger.warn("Heap is " + usage + " full. You may need to reduce memtable and/or cache sizes. Cassandra is now reducing cache sizes to free up memory. Adjust reduce_cache_sizes_at threshold in cassandra.yaml if you don't want Cassandra to do this automatically");
StorageService.instance.reduceCacheSizes();
}

When memory used is greater than reduce_cache_sizes_at (configured in cassanra.yaml, value at 0.85) multiply maximum memory in the heap and cache has not been reduced before. For example, if jvm is assigned with 8GB of heap, so the if statement evaluation become valid under such arithmetic, memory usage greater than 6.8GB when cache size has not been reduced before.

When the condition become true, StorageService will start to reduce cache size. A simple for loop over all column families to reduce the cache size. As seen here, there are two caches are being reduced. The rowcache and the keycache. Because we did not enable row cache and also not a focus on this study, I'll leave as an exercise for you. The investigation continue on the keyCache.reduceCacheSize();. As the snippet of code below shown.
public void reduceCacheSize()
{
if (getCapacity() > 0)
{
int newCapacity = (int) (DatabaseDescriptor.getReduceCacheCapacityTo() * size());
logger.warn(String.format("Reducing %s %s capacity from %d to %s to reduce memory pressure",
cfName, cacheType, getCapacity(), newCapacity));
setCapacity(newCapacity);
}
}

So if the capacity is initially assigned to a value larger than 0, then a new capacity is set. The new capacity is such that, reduce_cache_capacity_to (default at cassandra yaml, 0.60) multiply with the current size of the cache. For example, if the cache is occupied at 20000 x 0.60, the new value will be the new cache capacity at 12000.

This wrap up the investigation. Final thought, because the memory consumption is exceed certain amount of threshold, this emergency pressure valve kicked in. To fix immediate, an increase heap for cassandra instance will solve, but the correct would probably reduce node load or increase node for the cluster. When cache capacity is reduced, expect read become slower too and in data storage perspective, speed and performance is everything and reduced cache is definitely an impact to the cluster.

Friday, July 18, 2014

Cassandra discarding obsolete commit log

Often times, I noticed the log show the following lines in apache cassandra 1.0.8
INFO [COMMIT-LOG-WRITER] 2014-06-30 08:58:55,039 CommitLog.java (line 490) Discarding obsolete commit log:CommitLogSegment(/mnt/cassandra/commitlog/CommitLog-1404095354976.log)

It looks nothing to worry about as the log level is INFO. However, I often see this and just to check to understand what it really is.

This log is written by CommitLog.java.
private void maybeDiscardSegment(CommitLogSegment segment, Iterator<CommitLogSegment> iter)
{
if (segment.isSafeToDelete() && iter.hasNext())
{
logger.info("Discarding obsolete commit log:" + segment);
FileUtils.deleteAsync(segment.getPath());
// usually this will be the first (remaining) segment, but not always, if segment A contains
// writes to a CF that is unflushed but is followed by segment B whose CFs are all flushed.
iter.remove();
}
else
{
if (logger.isDebugEnabled())
logger.debug("Not safe to delete commit log " + segment + "; dirty is " + segment.dirtyString() + "; hasNext: " + iter.hasNext());
}
}

So why discard segment? From the code documentation

Commit Log tracks every write operation into the system. The aim of the commit log is to be able to successfully recover data that was not stored to disk via the Memtable. Every Commit Log maintains a header represented by the abstraction CommitLogHeader. The header contains a bit array and an array of longs and both the arrays are of size, #column families for the Table, the Commit Log represents.

Whenever a ColumnFamily is written to, for the first time its bit flag is set to one in the CommitLogHeader. When it is flushed to disk by the Memtable its corresponding bit in the header is set to zero. This helps track which CommitLogs can be thrown away as a result of Memtable flushes. Additionally, when a ColumnFamily is flushed and written to disk, its entry in the array of longs is updated with the offset in the Commit Log file where it was written. This helps speed up recovery since we can seek to these offsets and start processing the commit log.

Every Commit Log is rolled over everytime it reaches its threshold in size; the new log inherits the "dirty" bits from the old.

Over time there could be a number of commit logs that would be generated. To allow cleaning up non-active commit logs, whenever we flush a column family and update its bit flag in the active CL, we take the dirty bit array and bitwise & it with the headers of the older logs. If the result is 0, then it is safe to remove the older file. (Since the new CL inherited the old's dirty bitflags, getting a zero for any given bit in the anding means that either the CF was clean in the old CL or it has been flushed since the switch in the new.)

So that's pretty clear why commit log is remove. Tracing the call upward,
CommitLog.maybeDiscardSegment()
^
|
+--- CommitLog.discardCompletedSegmentsInternal()
^
|
+--- CommitLog.discardCompletedSegments()
^
|
+--- ColumnFamilyStore.maybeSwitchMemtable()
|
+--- ColumnFamilyStore.truncate()

So whenever a column family is truncated, the commit log is discard (remove) as well which make sense. When maybeSwitchMemtable is executed, that is, memtable is flushed, after that, its segments get discard (remove) as well.

This logging message is just fine as it is part of cassandra operation. That's it for this article.

Monday, May 5, 2014

Investigate into nodetool cleanup in cassandra

In this article, we are going to study into cassandra 1.0.8 nodetool cleanup. From nodetool help description; Run cleanup on one or more column family. That's clearly too general to understand clearly.

So right now we will trace the code to understand cleanup operation.

Cleanup is actually a type of compaction. Trace the code down under, CompactionManager.doCleanupCompaction() which actually does cleanup actual work.
/**
* This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
* @throws IOException
*/
private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, NodeId.OneShotRenewer renewer) throws IOException
{
assert !cfs.isIndex();
Table table = cfs.table;
Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
if (ranges.isEmpty())
{
logger.info("Cleanup cannot run before a node has joined the ring");
return;
}

for (SSTableReader sstable : sstables)
{
CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs), false);
long startTime = System.currentTimeMillis();

long totalkeysWritten = 0;

int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);

SSTableWriter writer = null;
SSTableReader newSstable = null;

logger.info("Cleaning up " + sstable);
// Calculate the expected compacted filesize
long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
if (compactionFileLocation == null)
throw new IOException("disk full");

SSTableScanner scanner = sstable.getDirectScanner();
Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
List<IColumn> indexedColumnsInRow = null;

CleanupInfo ci = new CleanupInfo(sstable, scanner);
executor.beginCompaction(ci);
try
{
while (scanner.hasNext())
{
SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
if (Range.isTokenInRanges(row.getKey().token, ranges))
{
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
if (compactedRow.isEmpty())
continue;
writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
writer.append(compactedRow);
totalkeysWritten++;
}
else
{
cfs.invalidateCachedRow(row.getKey());

if (!indexedColumns.isEmpty() || isCommutative)
{
if (indexedColumnsInRow != null)
indexedColumnsInRow.clear();

while (row.hasNext())
{
IColumn column = row.next();
if (column instanceof CounterColumn)
renewer.maybeRenew((CounterColumn) column);
if (indexedColumns.contains(column.name()))
{
if (indexedColumnsInRow == null)
indexedColumnsInRow = new ArrayList<IColumn>();

indexedColumnsInRow.add(column);
}
}

if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
{
// acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
Table.switchLock.readLock().lock();
try
{
cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow);
}
finally
{
Table.switchLock.readLock().unlock();
}
}
}
}
}
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
}
catch (Exception e)
{
if (writer != null)
writer.abort();
throw FBUtilities.unchecked(e);
}
finally
{
scanner.close();
executor.finishCompaction(ci);
}

List<SSTableReader> results = new ArrayList<SSTableReader>();
if (newSstable != null)
{
results.add(newSstable);

String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = System.currentTimeMillis() - startTime;
long startsize = sstable.onDiskLength();
long endsize = newSstable.onDiskLength();
double ratio = (double)endsize / (double)startsize;
logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int)(ratio*100), totalkeysWritten, dTime));
}

// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
cfs.indexManager.flushIndexesBlocking();

cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
}
}

With these method, it is very clear to us that, keys (that is the row.) that do not belong to this node will get removed. Following are points of summarization of what this method is actually done.

  • for this column family, get the range which this node is responsible for.

  • expectedRangeFileSize is half of summarization of all the input sstables file size.

  • for each sstables, a loop is done with the following tasks:



  1. check if enough disk size for the new compacted sstable.

  2. executor begin a cleanup compaction.

  3. iterate over the row in this sstable and check if the row key token is within the range this node is responsible for.

  4. if it is, get the compacted row and append this row to the SSTableWriter.

  5. if it is not, then the row will be invalidate in cache. The index created for this row will also be remove.



  • cleanup compaction is done by executor.

  • cleanup compaction infomation write to the logger.

  • flush index to disk.

  • old sstable is removed.


That's it about cassandra cleanup. If you learn something and would like to contribute back, please go to the donation page for more information.