Sunday, March 29, 2015

My journey and experience on upgrading apache cassandra 1.0.8 to 1.0.12

Upon request of my blog reader, today I will share with you my experience on upgrading apache cassandra version 1.0.8 to 1.0.12 on a production live cluster. By sharing this information, I hope if you are also running and/or administer cassandra cluster, you can learn from my experience and ease your worry or pain.

First, let's lay out what's the current architecture in this environment.

  • java 6

  • 12 nodes cluster.

  • two spinning disk with raid 0, 32GB total system memory where 14GB allocated to the cassandra heap instance, with 800MB for young gen. quad core cpu.

  • pretty much stock cassandra.yaml configuration with the following different like concurrent_write to 64, flush_largest_memtables_at to 0.8, compaction_throughput_mb_per_sec to 64.

  • node load per node average at 500-550GB.


As you can see, this is pretty ancient cassandra we are using at of this time of writing but because cassandra has been rock solid serving read/write requests for years, it stays like this stable condition forever and we leverage on the benefit of scalling out like adding nodes from six to nine and eventually to twelve now. Realizing that the disk failure do happened in the nodes of the cluster, because of cassandra has a no single point of failure in mind, we can afford to loose a single node out of operation while replacing it. That were a few of the reasons we stayed with cassandra 1.0 for quite sometime.

Because we would like to probably goes to cassandra 2.0 and beyond, and java 6 has been EOL for quite sometime, it would be wise to upgrade java before cassandra. Because system are integrated like an ecosystem, it would be also wise to look at java used in the client system that read/write requests to the cassandra cluster. So make a checklist brainstorming what are clients that integrate into the cluster and then check out what are the current stable java 7 available. Example:

cassandra 1.0 cassandra-1.0.12 java miniumum 6 and above.
https://github.com/apache/cassandra/tree/cassandra-1.0.12

hector client using casandra 2.0.4 so java 7 minimum
https://github.com/hector-client/hector/blob/master/pom.xml

datastax cql driver use cassandra 2.1.2 so java 7 minimum
https://github.com/datastax/java-driver/blob/2.1/pom.xml

java 7 update release note
http://www.oracle.com/technetwork/java/javase/7u-relnotes-515228.html

features and enhancement
http://www.oracle.com/technetwork/java/javase/jdk7-relnotes-418459.html

java 7 in wiki http://en.wikipedia.org/wiki/Java_version_history#Java_SE_7_.28July_28.2C_2011.29

unicode
before upgrading, check if cassandra using different unicode on the data http://www.herongyang.com/Unicode/Java-Unicode-Version-Supported-in-Java-History.html
http://docs.oracle.com/javase/7/docs/technotes/guides/intl/enhancements.7.html
Early versions of the Java SE 7 release added support for Unicode 5.1.0. The final version of the Java SE 7 release supports Unicode 6.0.0. Unicode 6.0.0 is a major version of the Unicode Standard and adds support for over 2000 additional characters, as well as support for properties and data files.

As of the time of checking, we picked java 7 update 72. Upgrading java 6 to java 7 update 72 in the cassandra 1.0.8 is a painless process other than just time consuming. As load per node is huge and total number of nodes in cluster. I follow this steps for java upgrade in cassandra node.

upgrade java for all cassandra node
1. write a script to automatically install java7 on node, update java stacked size to 256k in cassandra-env.sh. set JAVA_HOME for file cassandra.in.sh to java 7.
2. execute the script in rolling fashion for all the node with one upgrade at a time.
3. stop cassandra
4. execute the script.
5. start the cassandra instance
6.0 start the cassandra instance and monitor after the node is up and then check the monitoring system after node elapsed for 30minutes, 60minutes, 1hours and 2hours.
6.1 check your client can read/write to that one upgraded node.

By now, you can perform the next node in the ring, but you can skip step 6.0 as you are sure that it is going to work. One thing I observed is that, the gc duration for cassandra using java 6 and java 7 is it is down by half! That's could means faster gc means more cpu cycle to process other tasks and less stop of the world for cassandra instance.

Leave this cluster with java 7 upgraded run a day or two and if it is okay, continue to cassandra upgrade. So which cassandra version to upgrade to? There are several guidelines I followed.

1. choose ONLY STABLE release for production cluster. How to choose? You should read this link.
2. read NEWS.txt  and Changes.txt . As time to time, change to the code base may affect example, the sstable. So pay attention especially between cassandra major upgrade.
3. read the code difference between the version you decided to upgrade too, example for this upgrade. https://github.com/apache/cassandra/compare/cassandra-1.0.8...cassandra-1.0.12
4. read the datastax upgrading node for minor version.

I spent a lot of time doing step 3 and by reading the code diference, realize what has been change and/or added and consider it will impact your cassandra environment. In order for further upgrade to cassandra 1.1, you will need to upgrade to the latest version of the one currently deployed. Example here. Once read the above checkpoints, you may have a lot of questions and TODOs and that will give further works. In the next step, it is best if you find out the questions and TODOs you have and then verify in the test cluster before apply to a production cluster.

For me, I have written a few bash scripts example mentioned above, java upgrade. Also I have written install test cluster for cassandra upgrade. Remember to also write script to snapshot the data directory using nodetool and then also write script to automatically downgrade. When something goes wrong, you can revert using the automatic downgrade script and using the backup from nodetool dump. Then you will need to save the configurations example, cassandra.in.sh, cassandra-env.sh, cassandra.yaml or any other in your environment cluster.

With these scripts written and tested, it is best if you get and acknowledgements from the management if this is to be proceed and also, it would be best if you have someone who is also administer of cassandra cluster with you just for the good and bad moments. ;-) You can also reach me by my follow button in the home page. :)

upgrade cassandra from 1.0.8 to 1.0.12

  1. stop repair and cleanup in all nodes in the cluster.

  2. write a script to automatically upgrade it and so you dont panic, waste time and composed during node upgrade. Trust me, save you a lot of time and human error free. scripts content could be the following:
    - download cassandra 1.0.12 and extract, file permission ,etc
    - backup current cassandra 1.0.8 using nodetool snapshots. make sure you write the snapshot directory name like MyKeyspace-1.0.8-date
    - drain the node.
    - stop cassandra if it is not yet stopped.
    - update cassandra 1.0.12 with your cluster settings.

  3. check the configuration changed and then start cassandra 1.0.12 new instance.

  4. monitor after the node is up and then check the monitoring system after node elapsed for 30minutes, 60minutes, 1hours and 2hours.

  5. check your client can read/write to that one upgraded node.


By now, you can perform the next node in the ring, but you can skip step 4.0 as you are sure that it is going to work. As the version of the cassandra sstable change in 1.0.10, from hc to hd, it is best all sstables in all nodes, using the hd version before perform the next major upgrade.

That's it for this article and whilst this maybe not cover all, may contain mistake, and/or if you want to comment, please leave your comment below.

Saturday, March 28, 2015

Investigate into apache cassandra corrupt sstable exception

Today, we will take a look at another apache cassandra 1.0.8 exception. Example of stack trace below.
ERROR [SSTableBatchOpen:2] 2015-03-07 06:11:58,544 SSTableReader.java (line 228) Corrupt sstable /var/lib/cassandra/data/MySuperKeyspace/MyColumnFamily-hc-6681=[Index.db, Statistics.db, CompressionInfo.db, Filter.db, Data.db]; skipped
java.io.IOException: Input/output error
at java.io.RandomAccessFile.readBytes0(Native Method)
at java.io.RandomAccessFile.readBytes(RandomAccessFile.java:350)
at java.io.RandomAccessFile.read(RandomAccessFile.java:385)
at org.apache.cassandra.io.util.RandomAccessReader.reBuffer(RandomAccessReader.java:128)
at org.apache.cassandra.io.util.RandomAccessReader.read(RandomAccessReader.java:302)
at java.io.RandomAccessFile.readFully(RandomAccessFile.java:444)
at java.io.RandomAccessFile.readFully(RandomAccessFile.java:424)
at org.apache.cassandra.io.util.RandomAccessReader.readBytes(RandomAccessReader.java:324)
at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:393)
at org.apache.cassandra.io.sstable.SSTableReader.load(SSTableReader.java:375)
at org.apache.cassandra.io.sstable.SSTableReader.open(SSTableReader.java:186)
at org.apache.cassandra.io.sstable.SSTableReader$1.run(SSTableReader.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Before we go into the code base for this stacktrace, I have no idea what is this about and this one shown when the cassandra 1.0.12 instance is booting up. Last I remember I trigger user defined compaction twice in cassandra 1.0.8 using the same sstables and after first compaction is done, then this sstable stay forever... like for two weeks plus. Then we have upgrade for the cassandra.

Enough said, let's go into the code base and understand what is really mean by corrupt sstable. Bottom of the the stack trace pretty obvious, ThreadPoolExecutor execute a future task run method.Then it is now on apache cassandra namespace codebase, as can be read below class SSTableReader, method batchOpen(), code snippet
    public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
final Set<DecoratedKey> savedKeys,
final DataTracker tracker,
final CFMetaData metadata,
final IPartitioner partitioner)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();

ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors());
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
{
Runnable runnable = new Runnable()
{
public void run()
{
SSTableReader sstable;
try
{
sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner);
}
catch (IOException ex)
{
logger.error("Corrupt sstable " + entry + "; skipped", ex);
return;
}
sstables.add(sstable);
}
};
executor.submit(runnable);
}

executor.shutdown();
try
{
executor.awaitTermination(7, TimeUnit.DAYS);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}

return sstables;

}

As can be read above, somewhere within the method open() throw the IOException, hence the above exception was thrown. Two stack trace up, we read that, sstable load method execute and, ByteBufferUtil.read() method. With the method read from class ByteBufferUtil as shown below.
    public static ByteBuffer read(DataInput in, int length) throws IOException
{
if (in instanceof FileDataInput)
return ((FileDataInput) in).readBytes(length);

byte[] buff = new byte[length];
in.readFully(buff);
return ByteBuffer.wrap(buff);
}

We see that, the input in a instance of FileDataInput stream and read the bytes with length. Since FileDataInput is a interface, we read that, the class that implement this interface is RandomAccessReader class and method readBytes as the follow.
public ByteBuffer readBytes(int length) throws IOException
{
assert length >= 0 : "buffer length should not be negative: " + length;

byte[] buff = new byte[length];
readFully(buff); // reading data buffer

return ByteBuffer.wrap(buff);
}

to read bytes with length is actually to read fully on the length but started on the current file pointer pointing at. And a little bit way up in the stack trace, method reBuffer()
    /**
* Read data from file starting from current currentOffset to populate buffer.
* @throws IOException on any I/O error.
*/
protected void reBuffer() throws IOException
{
resetBuffer();

if (bufferOffset >= channel.size())
return;

channel.position(bufferOffset); // setting channel position

int read = 0;

while (read < buffer.length)
{
int n = super.read(buffer, read, buffer.length - read);
if (n < 0)
break;
read += n;
}

validBufferBytes = read;

bytesSinceCacheFlush += read;

if (skipIOCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
{
// with random I/O we can't control what we are skipping so
// it will be more appropriate to just skip a whole file after
// we reach threshold
CLibrary.trySkipCache(this.fd, 0, 0);
bytesSinceCacheFlush = 0;
}
}

and this method call superclass to read another chunk into the buffer. The upper class RandomAccessFile , method readBytes()
    /**
* Reads a sub array as a sequence of bytes.
* @param b the buffer into which the data is read.
* @param off the start offset of the data.
* @param len the number of bytes to read.
* @exception IOException If an I/O error has occurred.
*/
private int readBytes(byte b[], int off, int len) throws IOException {
Object traceContext = IoTrace.fileReadBegin(path);
int bytesRead = 0;
try {
bytesRead = readBytes0(b, off, len);
} finally {
IoTrace.fileReadEnd(traceContext, bytesRead == -1 ? 0 : bytesRead);
}
return bytesRead;
}

private native int readBytes0(byte b[], int off, int len) throws IOException;

.. and we are at the end of this path, it turn out that the call to readBytes0 thrown exception, the lower layer native non java call throwing the IO exception. You can use nodetool scrub to see if this fix the problem but what I do basically wipe the data directory for the cassandra and rebuild it. Then I don't see anymore of this message anymore.

That's it for this article and if you want to improve and/or comment, please leave your input below.

Friday, March 27, 2015

Investigate into apache cassandra get_slice assertion error

Today, we will investigate another error from apache cassandra. Error as shown below in cassandra log.
ERROR [Thrift:2] 2015-02-11 11:06:10,837 Cassandra.java (line 3041) Internal error processing get_slice
java.lang.AssertionError
at org.apache.cassandra.locator.TokenMetadata.firstTokenIndex(TokenMetadata.java:518)
at org.apache.cassandra.locator.TokenMetadata.firstToken(TokenMetadata.java:532)
at org.apache.cassandra.locator.AbstractReplicationStrategy.getNaturalEndpoints(AbstractReplicationStrategy.java:94)
at org.apache.cassandra.service.StorageService.getLiveNaturalEndpoints(StorageService.java:1992)
at org.apache.cassandra.service.StorageService.getLiveNaturalEndpoints(StorageService.java:1986)
at org.apache.cassandra.service.StorageProxy.fetchRows(StorageProxy.java:604)
at org.apache.cassandra.service.StorageProxy.read(StorageProxy.java:564)
at org.apache.cassandra.thrift.CassandraServer.readColumnFamily(CassandraServer.java:128)
at org.apache.cassandra.thrift.CassandraServer.getSlice(CassandraServer.java:283)
at org.apache.cassandra.thrift.CassandraServer.multigetSliceInternal(CassandraServer.java:365)
at org.apache.cassandra.thrift.CassandraServer.get_slice(CassandraServer.java:326)
at org.apache.cassandra.thrift.Cassandra$Processor$get_slice.process(Cassandra.java:3033)
at org.apache.cassandra.thrift.Cassandra$Processor.process(Cassandra.java:2889)
at org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:187)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

So bottom first three lines pretty easy, a thread is ran with the thread pool executor. As indicated by the code snipet below, that a worker process having trouble in processing a request.
    try
{
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown. (but not sufficient,
// since process() can take arbitrarily long waiting for client input.
// See comments at the end of serve().)
while (!stopped_ && processor.process(inputProtocol, outputProtocol))
{
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
}
}

Skipping a few low level byte stream processing, we arrived at the actual class which actually implement the method get_slice. Read code snippet below.
    public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
logger.debug("get_slice");

state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
}

so we see another method is called, multigetSliceInternal. Read code snippet below where a few validations on the data.
    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, predicate);
ThriftValidation.validateConsistencyLevel(keyspace, consistency_level);

List<ReadCommand> commands = new ArrayList<ReadCommand>();
if (predicate.column_names != null)
{
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(metadata, key);
commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names));
}
}
else
{
SliceRange range = predicate.slice_range;
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(metadata, key);
commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count));
}
}

return getSlice(commands, consistency_level);
}

then method getSlice is called,  and method readColumnFamily() is also called. As shown below, the code snippet
  protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
// TODO - Support multiple column families per row, right now row only contains 1 column family
Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>();

if (consistency_level == ConsistencyLevel.ANY)
{
throw new InvalidRequestException("Consistency level any may not be applied to read operations");
}

List<Row> rows;
try
{
schedule(DatabaseDescriptor.getRpcTimeout());
try
{
rows = StorageProxy.read(commands, consistency_level);
}
finally
{
release();
}
}
catch (TimeoutException e)
{
logger.debug("... timed out");
throw new TimedOutException();
}
catch (IOException e)
{
throw new RuntimeException(e);
}

for (Row row: rows)
{
columnFamilyKeyMap.put(row.key, row.cf);
}
return columnFamilyKeyMap;
}

another class is called, StorageProxy to read the row in concern and the read method code snippet below.
    /**
* Performs the actual reading of a row out of the StorageService, fetching
* a specific set of column names from a given column family.
*/
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level)
throws IOException, UnavailableException, TimeoutException, InvalidRequestException
{
if (StorageService.instance.isBootstrapMode())
throw new UnavailableException();
long startTime = System.nanoTime();
List<Row> rows;
try
{
rows = fetchRows(commands, consistency_level);
}
finally
{
readStats.addNano(System.nanoTime() - startTime);
}
return rows;
}

the exception lead this investigation to fetching the row and within the same class, for method fetchRows, code snippet below.
    /**
* This function executes local and remote reads, and blocks for the results:
*
* 1. Get the replica locations, sorted by response time according to the snitch
* 2. Send a data request to the closest replica, and digest requests to either
* a) all the replicas, if read repair is enabled
* b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel
* 3. Wait for a response from R replicas
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<Row> rows = new ArrayList<Row>(initialCommands.size());
List<ReadCommand> commandsToRetry = Collections.emptyList();

do
{
List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
ReadCallback<Row>[] readCallbacks = new ReadCallback[commands.size()];

if (!commandsToRetry.isEmpty())
logger.debug("Retrying {} commands", commandsToRetry.size());

// send out read requests
for (int i = 0; i < commands.size(); i++)
{
ReadCommand command = commands.get(i);
assert !command.isDigestQuery();
logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);

List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);

RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
handler.assureSufficientLiveNodes();
assert !handler.endpoints.isEmpty();
readCallbacks[i] = handler;

// The data-request message is sent to dataPoint, the node that will actually get the data for us
InetAddress dataPoint = handler.endpoints.get(0);
if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
logger.debug("reading data from {}", dataPoint);
MessagingService.instance().sendRR(command, dataPoint, handler);
}

if (handler.endpoints.size() == 1)
continue;

// send the other endpoints a digest request
ReadCommand digestCommand = command.copy();
digestCommand.setDigestQuery(true);
MessageProducer producer = null;
for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
{
logger.debug("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
logger.debug("reading digest from {}", digestPoint);
// (We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.)
if (producer == null)
producer = new CachingMessageProducer(digestCommand);
MessagingService.instance().sendRR(producer, digestPoint, handler);
}
}
}

// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
List<RepairCallback> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
ReadCallback<Row> handler = readCallbacks[i];
ReadCommand command = commands.get(i);
try
{
long startTime2 = System.currentTimeMillis();
Row row = handler.get();
if (row != null)
{
command.maybeTrim(row);
rows.add(row);
}

if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
}
catch (TimeoutException ex)
{
if (logger.isDebugEnabled())
logger.debug("Read timeout: {}", ex.toString());
throw ex;
}
catch (DigestMismatchException ex)
{
if (logger.isDebugEnabled())
logger.debug("Digest mismatch: {}", ex.toString());
RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);

if (repairCommands == null)
{
repairCommands = new ArrayList<ReadCommand>();
repairResponseHandlers = new ArrayList<RepairCallback>();
}
repairCommands.add(command);
repairResponseHandlers.add(repairHandler);

MessageProducer producer = new CachingMessageProducer(command);
for (InetAddress endpoint : handler.endpoints)
MessagingService.instance().sendRR(producer, endpoint, repairHandler);
}
}

if (commandsToRetry != Collections.EMPTY_LIST)
commandsToRetry.clear();

// read the results for the digest mismatch retries
if (repairResponseHandlers != null)
{
for (int i = 0; i < repairCommands.size(); i++)
{
ReadCommand command = repairCommands.get(i);
RepairCallback handler = repairResponseHandlers.get(i);
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
// behind on writes in case the out-of-sync row is read multiple times in quick succession
FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());

Row row;
try
{
row = handler.get();
}
catch (DigestMismatchException e)
{
throw new AssertionError(e); // full data requested from each node here, no digests should be sent
}

ReadCommand retryCommand = command.maybeGenerateRetryCommand(handler, row);
if (retryCommand != null)
{
logger.debug("issuing retry for read command");
if (commandsToRetry == Collections.EMPTY_LIST)
commandsToRetry = new ArrayList<ReadCommand>();
commandsToRetry.add(retryCommand);
continue;
}

if (row != null)
{
command.maybeTrim(row);
rows.add(row);
}
}
}
} while (!commandsToRetry.isEmpty());

return rows;
}

As this point of investigation, this method, fetchRows documentation is pretty useful for us.
* This function executes local and remote reads, and blocks for the results:
*
* 1. Get the replica locations, sorted by response time according to the snitch
* 2. Send a data request to the closest replica, and digest requests to either
* a) all the replicas, if read repair is enabled
* b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel

we see this method actually execute on local and remote node, and during getting the node who is responsible to keep the row, problem occur. Let's read on the method getLiveNaturalEndpoints() and as shown below.
    /**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer key)
{
return getLiveNaturalEndpoints(table, partitioner.getToken(key));
}

public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
{
List<InetAddress> liveEps = new ArrayList<InetAddress>();
List<InetAddress> endpoints = Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);

for (InetAddress endpoint : endpoints)
{
if (FailureDetector.instance.isAlive(endpoint))
liveEps.add(endpoint);
}

return liveEps;
}

a little upper in the stack trace, abstract class AbstractReplicationStrategy
    /**
* get the (possibly cached) endpoints that should store the given Token
* Note that while the endpoints are conceptually a Set (no duplicates will be included),
* we return a List to avoid an extra allocation when sorting by proximity later
* @param searchToken the token the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken)
{
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null)
{
TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
cacheEndpoint(keyToken, endpoints);
}

return new ArrayList<InetAddress>(endpoints);
}

somehow the ring size is equal to 0 or less than 0. class TokenMetadata.java and code snippet where the assertion thrown,
    public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
// insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring
int i = Collections.binarySearch(ring, start);
if (i < 0)
{
i = (i + 1) * (-1);
if (i >= ring.size())
i = insertMin ? -1 : 0;
}
return i;
}

public static Token firstToken(final ArrayList<Token> ring, Token start)
{
return ring.get(firstTokenIndex(ring, start, false));
}

So something went during during reading a row's column and somehow the natural endpoint is either 0 or empty. My guess is that, it could be gossip is disable so the ring metadata is empty. The solution is to enable the gossip and then restart cassandra instance.

If you think this analysis is not accurate or want to provide more information, please do so by commenting below. Thank you.

Sunday, March 15, 2015

Investigate into elasticsearch already flushing exception

Today, we will take a look at another elasticsearch issue. The example of message output in the log file happened in one of my node.
[2015-03-02 04:51:30,450][DEBUG][action.admin.indices.flush] [node02] [index_A][1], node[DEF_-ABC], [P], s[STARTED]: Failed to execute [org.elasticsearch.action.admin.indices.flush.FlushRequest@23465ba5]
org.elasticsearch.index.engine.FlushNotAllowedEngineException: [index_A][1] already flushing...
at org.elasticsearch.index.engine.robin.RobinEngine.flush(RobinEngine.java:825)
at org.elasticsearch.index.shard.service.InternalIndexShard.flush(InternalIndexShard.java:564)
at org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:114)
at org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:49)
at org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction$AsyncBroadcastAction$2.run(TransportBroadcastOperationAction.java:225)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

We will investigate into the code base lead to the exception above and to decide if this is a problem. Below is the snippet of the "entry" elasticsearch code.
public void run() {
try {
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}

so we read above, this is an asychronous broadcast action execution happened during performing an action. Code execution path tell that this shard is a local shard and encounter exception during flushing.
    @Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.flush(new Engine.Flush().type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
return new ShardFlushResponse(request.index(), request.shardId());
}

Tracing up in the stack trace, we noticed it is an internal index shard class during flushing of the shard.
    @Override
public void flush(Engine.Flush flush) throws ElasticSearchException {
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
verifyStartedOrRecovering();
if (logger.isTraceEnabled()) {
logger.trace("flush with {}", flush);
}
long time = System.nanoTime();
engine.flush(flush);
flushMetric.inc(System.nanoTime() - time);
}

As can be read above, we are at the end of the investigation journey here, it turn out that this shard flushing count is greater than one and is false, the flushing count get decrease and exception is thrown and tell that this shard is currently being flush.

As to why this is happening it is puzzling, but I never see this exception anymore, and from the investigation above, it does not look like this is an harm since the shard in concern is flushing currently.

If you think this analysis is not correct or you have better insight, please leave your comment below. Thank you.

Saturday, March 14, 2015

Investigate into elasticsearch indices memory marking shard active or inactive

If you have enable logging for indices memory controller indices.memory: DEBUG in logging.yml in elasticsearch 0.90 to see how is the shard behaving, you will notice in the log file, messages such as below happen quite often.
[2014-12-11 15:22:27,562][DEBUG][indices.memory           ] [es01] recalculating shard indexing buffer (reason=active/inactive[true] created/deleted[false]), total is [4.2gb] with [11] active shards, each shard set to indexing=[391.8mb], translog=[64kb]
[2014-12-11 15:23:57,562][DEBUG][indices.memory ] [es01] marking shard [index_A][7] as inactive (inactive_time[30m]) indexing wise, setting size to [500kb]
[2014-12-11 15:23:57,562][DEBUG][indices.memory ] [es01] marking shard [index_B][0] as inactive (inactive_time[30m]) indexing wise, setting size to [500kb]

Tracing into the code base, class IndexingMemoryController , we noticed that a periodic with default interval of 30seconds, runnable instance ShardsIndicesStatusChecker is created. Reading into this class, we see that the algorithm is coded in such a way to loop through the indices service and for a index service, check the index shard status. Based on the log output above, we see that the control goes into this path. The shard indexing inactive is true.
    if (!status.inactiveIndexing) {
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) {
// inactive for this amount of time, mark it
activeToInactiveIndexingShards.add(indexShard);
status.inactiveIndexing = true;
activeInactiveStatusChanges = true;
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
}
}

Going further down the code, because there is a change in the variable activeInactiveStatusChanges, the code if statement evaluation become true,
if (shardsCreatedOrDeleted || activeInactiveStatusChanges) {
calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] created/deleted[" + shardsCreatedOrDeleted + "]");
}

Tracing into the code on method calcAndSetShardBuffers(), we see that the shard buffer size is calculated accordingly within a range. The range for default miniumum shard buffer is 4MB and default maximum shard buffer 512MB. Similar algorithm happen to translog buffer size as well. Default minimum translog buffer size is 2KB and default maximum translog buffer size is 64KB. Then we see the log message such as the one above is written using the code below.
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize);

From this analysis, it looks to me elasticsearch is working fine checking the index shard status, and recalculate the shard and translog buffer accordingly. If you think this analysis is incorrect or would like to contribute more information, please leave your comment below.

Friday, March 13, 2015

check out java wait, notify and notifyAll method

If you have been program java for a while already, you will most probably encounter method such as wait, notify and notifyAll. Example screenshot below.

java_object_notify

So today, we will take a look at these methods and what are them about and example code of usage of these methods. First, let's read the javadoc for these methods.

wait()
public final void wait()
throws InterruptedException

 

Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. In other words, this method behaves exactly as if it simply performs the call wait(0).

 

The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution.

 

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:

 

synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}

 

This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.

 

Throws:
IllegalMonitorStateException - if the current thread is not the owner of the object's monitor.
InterruptedException - if any thread interrupted the current thread before or while the current thread was waiting for a notification. The interrupted status of the current thread is cleared when this exception is thrown.

notify()
public final void notify()

 

Wakes up a single thread that is waiting on this object's monitor. If any threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation. A thread waits on an object's monitor by calling one of the wait methods.

 

The awakened thread will not be able to proceed until the current thread relinquishes the lock on this object. The awakened thread will compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for example, the awakened thread enjoys no reliable privilege or disadvantage in being the next thread to lock this object.

 

This method should only be called by a thread that is the owner of this object's monitor. A thread becomes the owner of the object's monitor in one of three ways:

 

By executing a synchronized instance method of that object.
By executing the body of a synchronized statement that synchronizes on the object.
For objects of type Class, by executing a synchronized static method of that class.

 

Only one thread at a time can own an object's monitor.

 

Throws:
IllegalMonitorStateException - if the current thread is not the owner of this object's monitor.

notifyAll()
public final void notifyAll()

 

Wakes up all threads that are waiting on this object's monitor. A thread waits on an object's monitor by calling one of the wait methods.

 

The awakened threads will not be able to proceed until the current thread relinquishes the lock on this object. The awakened threads will compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for example, the awakened threads enjoy no reliable privilege or disadvantage in being the next thread to lock this object.

 

This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.

 

Throws:
IllegalMonitorStateException - if the current thread is not the owner of this object's monitor.

Careful reader like you might notice that, all these methods belong to the class Object and remember that all object in java has Object class as their super class. So now this answer why these methods always exists on all objects.

Now, it is pretty clear that these method are used between two or more threads that interact with each other. You might ask when or why should I use these methods? If you have these codes, like thread sleep for a second (or less) and wait up to poll/check if the queue is fill. This is resource wasting like expensive cpu cycle. Then you should start to change the code to use these methods instead.

Okay, reasons given and now let's read into code example of these method usage. I have google and many given calculator example and I will also use this an my first example. >:)
public class Calculator {

public static void main(String[] args) {

ThreadB b = new ThreadB();
b.start();

synchronized (b) {
try {
System.out.println("Waiting for b to complete...");
b.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("total is : " + b.total);
}

}

static class ThreadB extends Thread {
int total;

@Override
public void run() {
System.out.println("calculate start");
synchronized(this) {
for (int i = 0 ; i < 100; i++) {
total += i;
}
notify();
}
System.out.println("calculate done");
}
}

}

output

Waiting for b to complete...
calculate start
calculate done
total is : 4950

As can be read above, we have a Calculator class. Then we have a static inner class to do a summation of 0 to 100. Noticed that a thread is started in the main method. object b is synchronized and thread b tell the main thread to wait until it is notify. In object b, we see that the thread goes through the loop to do the summation and once long running summation is executed, then thread b notify. Total is print in the main method and then thread b is done with the calculation. As for homework, try modify the codebase by removing synchronized keyword, wait and notify method, and run the application again, see the output. It will not be consistent and result will not be as expected.

Okay, let's go to a more complex example. Imagine a simple clinic where there are many patients waiting to be service by a doctor and there will be a nurse to handle the patient to the doctor one at a time. With that said, let's layout and read the code.
import java.util.Queue;
import java.util.Random;

public class Doctor extends Thread {

private Queue<String> patientQueue;

public Doctor(Queue<String> patientQueue) {
this.patientQueue = patientQueue;
}

@Override
public void run() {
try {
while (true) {
synchronized (patientQueue) {
if (patientQueue.size() == 0) {
patientQueue.wait();
}

String patient = patientQueue.remove();

System.out.println("treating patient: " + patient + " total patient in queue " + patientQueue.size());
Thread.sleep(duration(2, 3));
}

}
} catch (InterruptedException e) {
e.printStackTrace();
}
}


private static int duration(int min, int max) {

Random rand = new Random();

// nextInt is normally exclusive of the top value,
// so add 1 to make it inclusive
int randomNum = rand.nextInt((max - min) + 1) + min;
randomNum *= 1000;

return randomNum;
}

}

import java.util.Queue;

public class Nurse extends Thread {

static final int MAX = 10;
private int i;

private Queue<String> patientQueue;

public Nurse(Queue<String> patientQueue) {
this.patientQueue = patientQueue;
}


@Override
public void run() {
try {
while (true) {
patientAddedToQueue();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private synchronized void patientAddedToQueue() throws InterruptedException {
synchronized (patientQueue) {
while (patientQueue.size() == MAX) {
patientQueue.wait(10000);
System.out.println("done wait");
}
String patientName = "patient " + i;
patientQueue.add(patientName);
i++;
System.out.println("adding patient : " + patientName);
patientQueue.notify();

}

}

}

import java.util.LinkedList;
import java.util.Queue;

public class Clinics {

public static void main(String args[]) {

Queue<String> patientQueue = new LinkedList<String>();

Doctor veelan = new Doctor(patientQueue);
veelan.start();

Nurse gomathy = new Nurse(patientQueue);
gomathy.start();

}

}

Example output
adding patient : patient 0
adding patient : patient 1
adding patient : patient 2
adding patient : patient 3
adding patient : patient 4
adding patient : patient 5
adding patient : patient 6
adding patient : patient 7
adding patient : patient 8
adding patient : patient 9
treating patient: patient 0 total patient in queue 9
treating patient: patient 1 total patient in queue 8
treating patient: patient 2 total patient in queue 7
treating patient: patient 3 total patient in queue 6
treating patient: patient 4 total patient in queue 5
treating patient: patient 5 total patient in queue 4
treating patient: patient 6 total patient in queue 3
treating patient: patient 7 total patient in queue 2
treating patient: patient 8 total patient in queue 1
treating patient: patient 9 total patient in queue 0
done wait
adding patient : patient 10
adding patient : patient 11
adding patient : patient 12
adding patient : patient 13
adding patient : patient 14
adding patient : patient 15
adding patient : patient 16
adding patient : patient 17
adding patient : patient 18
adding patient : patient 19
treating patient: patient 10 total patient in queue 9
treating patient: patient 11 total patient in queue 8

As you can read above, we have three classes, Doctor, Nurse and Clinics. The entry point is in the class Clinics where this class bind the two other classes. A queue is share between class Doctor and Nurse class where a mock up example of nurse class will "accept" new patient with a private method patientAddedToQueue and if the queue is full at 10 patient, then a wait of 10seconds to tell the nurse object to stop accept new patient. Why 10 seconds of timeout, I will explain later. If the queue is less than 10, then a patient is added to the queue and method notify is called.

In the class Doctor, in the run method, we read that object patientQueue is synchronized. If the size is zero, there is no point to called remove method of patientQueue as it just make no sense. ;-) This doctor thread is sleep between 2 to 3 seconds to simulate a mock up example of when doctor is treating the patient.

Now, imagine if nurse gomathy has queue of size 10 and gomathy is on wait state, and meanwhile patientQueue is process by doctor veelan gradually, the queue came down slowly. But eventually the queue will become 0 as well. At this time, gomathy nurse object and veelan doctor object both in wait state. Hence, now you may figure out why 10 seconds of gomathy will start to receive more patients. :-) Probably this is another homework with better interaction between thread nurse and thread doctor for you reader. Try also add one more doctor and change notify to notifyAll.

Well that's it for this article. I hope you learn something and remember to contribute back.

Sunday, March 1, 2015

Study logic elasticsearch discovery zen fd ping_timeout and ping_retries

Today, we are going to study two parameter from elasticsearch version 0.90.7, specifically

  • discovery.zen.fd.ping_timeout

  • discovery.zen.fd.ping_retries


Let's find the definition from official documentation, here and here.
There are two fault detection processes running. The first is by the master, to ping all the other nodes in the cluster and verify that they are alive. And on the other end, each node pings to master to verify if its still alive or an election process needs to be initiated.

 

ping_timeout How long to wait for a ping response, defaults to 30s.
ping_retries How many ping failures / timeouts cause a node to be considered failed. Defaults to 3.

So this setting is use by master node and data node for detection if the node is okay or not and once ping, the duration time to wait is 30seconds (default) and if 3 times, a node is considered down/failed if it exceed 3 times. Okay, let's go into the code.

NodesFaultDetection.java

Within the class NodesFaultDetection, there is a inner class SendPingRequest. As it implement interface Runnable, the run method will be execute by an executor. Instead, object are read and write so emulate a ping behaviour, you can read the class PingRequest for more information. As you noticed, ping_timeout is pass to the super class of PingRequest.

The essence of logic is pretty much written in the statement transportService.sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler<T> handler). You would think it would be an ICMP ping, but it is not, there isn't isReachable() is called.

In the method, handleResponse(PingResponse response), we see that, the retry count is reset to 0 and then this SendPingRequest object is schedule again with the ping_interval you set earlier. In the method, handleException(TransportException exp). we see that the variable retryCount is increase by one, and if the current retry count exceed the default 3 times, then the node is considered dead and were removed. If the current retry count is less than the default 3 times, then another ping is send with the same ping_timeout.

MasterFaultDetection.java

Master fault detection is a little different than nodes fault detection. When the public method of MasterFaultDetection start is called and then method innerStart(), object MasterPinger is created.
this.masterPinger = new MasterPinger();
// start the ping process
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);

So there is a periodic ping of default 1 second. When instance of MasterPinger is run, we noticed that it goes through the same process of sending the request using transport service. transportService.sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler<T> handler)
The logic of this request sending is same with NodesFaultDetection. What interesting is the method override in class BaseTransportResponseHandler. In handleResponse, so we see the the retry count is reset back to 0. Then another ping is scheduled.

In the override method handleException(TransportException exp) , so there are three exception check on the master if it no longer a master, or ping to a non master ping a master but does not exists on it. Now at the stage, retry count is increase by one. If current retry count greater than or equal to the default 3 times, then ping to node by this master is falied, this node consider failed. if current retry count less than the default three, another ping is sent.

That's it, if you think this analysis need improvement, please leave your comment below. Thank you.