Showing posts with label java. Show all posts
Showing posts with label java. Show all posts

Saturday, March 28, 2015

Investigate into apache cassandra corrupt sstable exception

Today, we will take a look at another apache cassandra 1.0.8 exception. Example of stack trace below.
ERROR [SSTableBatchOpen:2] 2015-03-07 06:11:58,544 SSTableReader.java (line 228) Corrupt sstable /var/lib/cassandra/data/MySuperKeyspace/MyColumnFamily-hc-6681=[Index.db, Statistics.db, CompressionInfo.db, Filter.db, Data.db]; skipped
java.io.IOException: Input/output error
at java.io.RandomAccessFile.readBytes0(Native Method)
at java.io.RandomAccessFile.readBytes(RandomAccessFile.java:350)
at java.io.RandomAccessFile.read(RandomAccessFile.java:385)
at org.apache.cassandra.io.util.RandomAccessReader.reBuffer(RandomAccessReader.java:128)
at org.apache.cassandra.io.util.RandomAccessReader.read(RandomAccessReader.java:302)
at java.io.RandomAccessFile.readFully(RandomAccessFile.java:444)
at java.io.RandomAccessFile.readFully(RandomAccessFile.java:424)
at org.apache.cassandra.io.util.RandomAccessReader.readBytes(RandomAccessReader.java:324)
at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:393)
at org.apache.cassandra.io.sstable.SSTableReader.load(SSTableReader.java:375)
at org.apache.cassandra.io.sstable.SSTableReader.open(SSTableReader.java:186)
at org.apache.cassandra.io.sstable.SSTableReader$1.run(SSTableReader.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Before we go into the code base for this stacktrace, I have no idea what is this about and this one shown when the cassandra 1.0.12 instance is booting up. Last I remember I trigger user defined compaction twice in cassandra 1.0.8 using the same sstables and after first compaction is done, then this sstable stay forever... like for two weeks plus. Then we have upgrade for the cassandra.

Enough said, let's go into the code base and understand what is really mean by corrupt sstable. Bottom of the the stack trace pretty obvious, ThreadPoolExecutor execute a future task run method.Then it is now on apache cassandra namespace codebase, as can be read below class SSTableReader, method batchOpen(), code snippet
    public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
final Set<DecoratedKey> savedKeys,
final DataTracker tracker,
final CFMetaData metadata,
final IPartitioner partitioner)
{
final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();

ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors());
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
{
Runnable runnable = new Runnable()
{
public void run()
{
SSTableReader sstable;
try
{
sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner);
}
catch (IOException ex)
{
logger.error("Corrupt sstable " + entry + "; skipped", ex);
return;
}
sstables.add(sstable);
}
};
executor.submit(runnable);
}

executor.shutdown();
try
{
executor.awaitTermination(7, TimeUnit.DAYS);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}

return sstables;

}

As can be read above, somewhere within the method open() throw the IOException, hence the above exception was thrown. Two stack trace up, we read that, sstable load method execute and, ByteBufferUtil.read() method. With the method read from class ByteBufferUtil as shown below.
    public static ByteBuffer read(DataInput in, int length) throws IOException
{
if (in instanceof FileDataInput)
return ((FileDataInput) in).readBytes(length);

byte[] buff = new byte[length];
in.readFully(buff);
return ByteBuffer.wrap(buff);
}

We see that, the input in a instance of FileDataInput stream and read the bytes with length. Since FileDataInput is a interface, we read that, the class that implement this interface is RandomAccessReader class and method readBytes as the follow.
public ByteBuffer readBytes(int length) throws IOException
{
assert length >= 0 : "buffer length should not be negative: " + length;

byte[] buff = new byte[length];
readFully(buff); // reading data buffer

return ByteBuffer.wrap(buff);
}

to read bytes with length is actually to read fully on the length but started on the current file pointer pointing at. And a little bit way up in the stack trace, method reBuffer()
    /**
* Read data from file starting from current currentOffset to populate buffer.
* @throws IOException on any I/O error.
*/
protected void reBuffer() throws IOException
{
resetBuffer();

if (bufferOffset >= channel.size())
return;

channel.position(bufferOffset); // setting channel position

int read = 0;

while (read < buffer.length)
{
int n = super.read(buffer, read, buffer.length - read);
if (n < 0)
break;
read += n;
}

validBufferBytes = read;

bytesSinceCacheFlush += read;

if (skipIOCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
{
// with random I/O we can't control what we are skipping so
// it will be more appropriate to just skip a whole file after
// we reach threshold
CLibrary.trySkipCache(this.fd, 0, 0);
bytesSinceCacheFlush = 0;
}
}

and this method call superclass to read another chunk into the buffer. The upper class RandomAccessFile , method readBytes()
    /**
* Reads a sub array as a sequence of bytes.
* @param b the buffer into which the data is read.
* @param off the start offset of the data.
* @param len the number of bytes to read.
* @exception IOException If an I/O error has occurred.
*/
private int readBytes(byte b[], int off, int len) throws IOException {
Object traceContext = IoTrace.fileReadBegin(path);
int bytesRead = 0;
try {
bytesRead = readBytes0(b, off, len);
} finally {
IoTrace.fileReadEnd(traceContext, bytesRead == -1 ? 0 : bytesRead);
}
return bytesRead;
}

private native int readBytes0(byte b[], int off, int len) throws IOException;

.. and we are at the end of this path, it turn out that the call to readBytes0 thrown exception, the lower layer native non java call throwing the IO exception. You can use nodetool scrub to see if this fix the problem but what I do basically wipe the data directory for the cassandra and rebuild it. Then I don't see anymore of this message anymore.

That's it for this article and if you want to improve and/or comment, please leave your input below.

Friday, March 27, 2015

Investigate into apache cassandra get_slice assertion error

Today, we will investigate another error from apache cassandra. Error as shown below in cassandra log.
ERROR [Thrift:2] 2015-02-11 11:06:10,837 Cassandra.java (line 3041) Internal error processing get_slice
java.lang.AssertionError
at org.apache.cassandra.locator.TokenMetadata.firstTokenIndex(TokenMetadata.java:518)
at org.apache.cassandra.locator.TokenMetadata.firstToken(TokenMetadata.java:532)
at org.apache.cassandra.locator.AbstractReplicationStrategy.getNaturalEndpoints(AbstractReplicationStrategy.java:94)
at org.apache.cassandra.service.StorageService.getLiveNaturalEndpoints(StorageService.java:1992)
at org.apache.cassandra.service.StorageService.getLiveNaturalEndpoints(StorageService.java:1986)
at org.apache.cassandra.service.StorageProxy.fetchRows(StorageProxy.java:604)
at org.apache.cassandra.service.StorageProxy.read(StorageProxy.java:564)
at org.apache.cassandra.thrift.CassandraServer.readColumnFamily(CassandraServer.java:128)
at org.apache.cassandra.thrift.CassandraServer.getSlice(CassandraServer.java:283)
at org.apache.cassandra.thrift.CassandraServer.multigetSliceInternal(CassandraServer.java:365)
at org.apache.cassandra.thrift.CassandraServer.get_slice(CassandraServer.java:326)
at org.apache.cassandra.thrift.Cassandra$Processor$get_slice.process(Cassandra.java:3033)
at org.apache.cassandra.thrift.Cassandra$Processor.process(Cassandra.java:2889)
at org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:187)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

So bottom first three lines pretty easy, a thread is ran with the thread pool executor. As indicated by the code snipet below, that a worker process having trouble in processing a request.
    try
{
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown. (but not sufficient,
// since process() can take arbitrarily long waiting for client input.
// See comments at the end of serve().)
while (!stopped_ && processor.process(inputProtocol, outputProtocol))
{
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
}
}

Skipping a few low level byte stream processing, we arrived at the actual class which actually implement the method get_slice. Read code snippet below.
    public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
logger.debug("get_slice");

state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
}

so we see another method is called, multigetSliceInternal. Read code snippet below where a few validations on the data.
    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
ThriftValidation.validateColumnParent(metadata, column_parent);
ThriftValidation.validatePredicate(metadata, column_parent, predicate);
ThriftValidation.validateConsistencyLevel(keyspace, consistency_level);

List<ReadCommand> commands = new ArrayList<ReadCommand>();
if (predicate.column_names != null)
{
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(metadata, key);
commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names));
}
}
else
{
SliceRange range = predicate.slice_range;
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(metadata, key);
commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count));
}
}

return getSlice(commands, consistency_level);
}

then method getSlice is called,  and method readColumnFamily() is also called. As shown below, the code snippet
  protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
// TODO - Support multiple column families per row, right now row only contains 1 column family
Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>();

if (consistency_level == ConsistencyLevel.ANY)
{
throw new InvalidRequestException("Consistency level any may not be applied to read operations");
}

List<Row> rows;
try
{
schedule(DatabaseDescriptor.getRpcTimeout());
try
{
rows = StorageProxy.read(commands, consistency_level);
}
finally
{
release();
}
}
catch (TimeoutException e)
{
logger.debug("... timed out");
throw new TimedOutException();
}
catch (IOException e)
{
throw new RuntimeException(e);
}

for (Row row: rows)
{
columnFamilyKeyMap.put(row.key, row.cf);
}
return columnFamilyKeyMap;
}

another class is called, StorageProxy to read the row in concern and the read method code snippet below.
    /**
* Performs the actual reading of a row out of the StorageService, fetching
* a specific set of column names from a given column family.
*/
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistency_level)
throws IOException, UnavailableException, TimeoutException, InvalidRequestException
{
if (StorageService.instance.isBootstrapMode())
throw new UnavailableException();
long startTime = System.nanoTime();
List<Row> rows;
try
{
rows = fetchRows(commands, consistency_level);
}
finally
{
readStats.addNano(System.nanoTime() - startTime);
}
return rows;
}

the exception lead this investigation to fetching the row and within the same class, for method fetchRows, code snippet below.
    /**
* This function executes local and remote reads, and blocks for the results:
*
* 1. Get the replica locations, sorted by response time according to the snitch
* 2. Send a data request to the closest replica, and digest requests to either
* a) all the replicas, if read repair is enabled
* b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel
* 3. Wait for a response from R replicas
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<Row> rows = new ArrayList<Row>(initialCommands.size());
List<ReadCommand> commandsToRetry = Collections.emptyList();

do
{
List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
ReadCallback<Row>[] readCallbacks = new ReadCallback[commands.size()];

if (!commandsToRetry.isEmpty())
logger.debug("Retrying {} commands", commandsToRetry.size());

// send out read requests
for (int i = 0; i < commands.size(); i++)
{
ReadCommand command = commands.get(i);
assert !command.isDigestQuery();
logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);

List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), endpoints);

RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
handler.assureSufficientLiveNodes();
assert !handler.endpoints.isEmpty();
readCallbacks[i] = handler;

// The data-request message is sent to dataPoint, the node that will actually get the data for us
InetAddress dataPoint = handler.endpoints.get(0);
if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
logger.debug("reading data from {}", dataPoint);
MessagingService.instance().sendRR(command, dataPoint, handler);
}

if (handler.endpoints.size() == 1)
continue;

// send the other endpoints a digest request
ReadCommand digestCommand = command.copy();
digestCommand.setDigestQuery(true);
MessageProducer producer = null;
for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
{
logger.debug("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
logger.debug("reading digest from {}", digestPoint);
// (We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.)
if (producer == null)
producer = new CachingMessageProducer(digestCommand);
MessagingService.instance().sendRR(producer, digestPoint, handler);
}
}
}

// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
List<RepairCallback> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
ReadCallback<Row> handler = readCallbacks[i];
ReadCommand command = commands.get(i);
try
{
long startTime2 = System.currentTimeMillis();
Row row = handler.get();
if (row != null)
{
command.maybeTrim(row);
rows.add(row);
}

if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
}
catch (TimeoutException ex)
{
if (logger.isDebugEnabled())
logger.debug("Read timeout: {}", ex.toString());
throw ex;
}
catch (DigestMismatchException ex)
{
if (logger.isDebugEnabled())
logger.debug("Digest mismatch: {}", ex.toString());
RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);

if (repairCommands == null)
{
repairCommands = new ArrayList<ReadCommand>();
repairResponseHandlers = new ArrayList<RepairCallback>();
}
repairCommands.add(command);
repairResponseHandlers.add(repairHandler);

MessageProducer producer = new CachingMessageProducer(command);
for (InetAddress endpoint : handler.endpoints)
MessagingService.instance().sendRR(producer, endpoint, repairHandler);
}
}

if (commandsToRetry != Collections.EMPTY_LIST)
commandsToRetry.clear();

// read the results for the digest mismatch retries
if (repairResponseHandlers != null)
{
for (int i = 0; i < repairCommands.size(); i++)
{
ReadCommand command = repairCommands.get(i);
RepairCallback handler = repairResponseHandlers.get(i);
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
// behind on writes in case the out-of-sync row is read multiple times in quick succession
FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());

Row row;
try
{
row = handler.get();
}
catch (DigestMismatchException e)
{
throw new AssertionError(e); // full data requested from each node here, no digests should be sent
}

ReadCommand retryCommand = command.maybeGenerateRetryCommand(handler, row);
if (retryCommand != null)
{
logger.debug("issuing retry for read command");
if (commandsToRetry == Collections.EMPTY_LIST)
commandsToRetry = new ArrayList<ReadCommand>();
commandsToRetry.add(retryCommand);
continue;
}

if (row != null)
{
command.maybeTrim(row);
rows.add(row);
}
}
}
} while (!commandsToRetry.isEmpty());

return rows;
}

As this point of investigation, this method, fetchRows documentation is pretty useful for us.
* This function executes local and remote reads, and blocks for the results:
*
* 1. Get the replica locations, sorted by response time according to the snitch
* 2. Send a data request to the closest replica, and digest requests to either
* a) all the replicas, if read repair is enabled
* b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel

we see this method actually execute on local and remote node, and during getting the node who is responsible to keep the row, problem occur. Let's read on the method getLiveNaturalEndpoints() and as shown below.
    /**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
public List<InetAddress> getLiveNaturalEndpoints(String table, ByteBuffer key)
{
return getLiveNaturalEndpoints(table, partitioner.getToken(key));
}

public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
{
List<InetAddress> liveEps = new ArrayList<InetAddress>();
List<InetAddress> endpoints = Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);

for (InetAddress endpoint : endpoints)
{
if (FailureDetector.instance.isAlive(endpoint))
liveEps.add(endpoint);
}

return liveEps;
}

a little upper in the stack trace, abstract class AbstractReplicationStrategy
    /**
* get the (possibly cached) endpoints that should store the given Token
* Note that while the endpoints are conceptually a Set (no duplicates will be included),
* we return a List to avoid an extra allocation when sorting by proximity later
* @param searchToken the token the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken)
{
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null)
{
TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
cacheEndpoint(keyToken, endpoints);
}

return new ArrayList<InetAddress>(endpoints);
}

somehow the ring size is equal to 0 or less than 0. class TokenMetadata.java and code snippet where the assertion thrown,
    public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
// insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring
int i = Collections.binarySearch(ring, start);
if (i < 0)
{
i = (i + 1) * (-1);
if (i >= ring.size())
i = insertMin ? -1 : 0;
}
return i;
}

public static Token firstToken(final ArrayList<Token> ring, Token start)
{
return ring.get(firstTokenIndex(ring, start, false));
}

So something went during during reading a row's column and somehow the natural endpoint is either 0 or empty. My guess is that, it could be gossip is disable so the ring metadata is empty. The solution is to enable the gossip and then restart cassandra instance.

If you think this analysis is not accurate or want to provide more information, please do so by commenting below. Thank you.

Sunday, March 15, 2015

Investigate into elasticsearch already flushing exception

Today, we will take a look at another elasticsearch issue. The example of message output in the log file happened in one of my node.
[2015-03-02 04:51:30,450][DEBUG][action.admin.indices.flush] [node02] [index_A][1], node[DEF_-ABC], [P], s[STARTED]: Failed to execute [org.elasticsearch.action.admin.indices.flush.FlushRequest@23465ba5]
org.elasticsearch.index.engine.FlushNotAllowedEngineException: [index_A][1] already flushing...
at org.elasticsearch.index.engine.robin.RobinEngine.flush(RobinEngine.java:825)
at org.elasticsearch.index.shard.service.InternalIndexShard.flush(InternalIndexShard.java:564)
at org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:114)
at org.elasticsearch.action.admin.indices.flush.TransportFlushAction.shardOperation(TransportFlushAction.java:49)
at org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction$AsyncBroadcastAction$2.run(TransportBroadcastOperationAction.java:225)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

We will investigate into the code base lead to the exception above and to decide if this is a problem. Below is the snippet of the "entry" elasticsearch code.
public void run() {
try {
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}

so we read above, this is an asychronous broadcast action execution happened during performing an action. Code execution path tell that this shard is a local shard and encounter exception during flushing.
    @Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.flush(new Engine.Flush().type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
return new ShardFlushResponse(request.index(), request.shardId());
}

Tracing up in the stack trace, we noticed it is an internal index shard class during flushing of the shard.
    @Override
public void flush(Engine.Flush flush) throws ElasticSearchException {
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
verifyStartedOrRecovering();
if (logger.isTraceEnabled()) {
logger.trace("flush with {}", flush);
}
long time = System.nanoTime();
engine.flush(flush);
flushMetric.inc(System.nanoTime() - time);
}

As can be read above, we are at the end of the investigation journey here, it turn out that this shard flushing count is greater than one and is false, the flushing count get decrease and exception is thrown and tell that this shard is currently being flush.

As to why this is happening it is puzzling, but I never see this exception anymore, and from the investigation above, it does not look like this is an harm since the shard in concern is flushing currently.

If you think this analysis is not correct or you have better insight, please leave your comment below. Thank you.

Saturday, March 14, 2015

Investigate into elasticsearch indices memory marking shard active or inactive

If you have enable logging for indices memory controller indices.memory: DEBUG in logging.yml in elasticsearch 0.90 to see how is the shard behaving, you will notice in the log file, messages such as below happen quite often.
[2014-12-11 15:22:27,562][DEBUG][indices.memory           ] [es01] recalculating shard indexing buffer (reason=active/inactive[true] created/deleted[false]), total is [4.2gb] with [11] active shards, each shard set to indexing=[391.8mb], translog=[64kb]
[2014-12-11 15:23:57,562][DEBUG][indices.memory ] [es01] marking shard [index_A][7] as inactive (inactive_time[30m]) indexing wise, setting size to [500kb]
[2014-12-11 15:23:57,562][DEBUG][indices.memory ] [es01] marking shard [index_B][0] as inactive (inactive_time[30m]) indexing wise, setting size to [500kb]

Tracing into the code base, class IndexingMemoryController , we noticed that a periodic with default interval of 30seconds, runnable instance ShardsIndicesStatusChecker is created. Reading into this class, we see that the algorithm is coded in such a way to loop through the indices service and for a index service, check the index shard status. Based on the log output above, we see that the control goes into this path. The shard indexing inactive is true.
    if (!status.inactiveIndexing) {
// mark it as inactive only if enough time has passed and there are no ongoing merges going on...
if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) {
// inactive for this amount of time, mark it
activeToInactiveIndexingShards.add(indexShard);
status.inactiveIndexing = true;
activeInactiveStatusChanges = true;
logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, Engine.INACTIVE_SHARD_INDEXING_BUFFER);
}
}

Going further down the code, because there is a change in the variable activeInactiveStatusChanges, the code if statement evaluation become true,
if (shardsCreatedOrDeleted || activeInactiveStatusChanges) {
calcAndSetShardBuffers("active/inactive[" + activeInactiveStatusChanges + "] created/deleted[" + shardsCreatedOrDeleted + "]");
}

Tracing into the code on method calcAndSetShardBuffers(), we see that the shard buffer size is calculated accordingly within a range. The range for default miniumum shard buffer is 4MB and default maximum shard buffer 512MB. Similar algorithm happen to translog buffer size as well. Default minimum translog buffer size is 2KB and default maximum translog buffer size is 64KB. Then we see the log message such as the one above is written using the code below.
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize, shardTranslogBufferSize);

From this analysis, it looks to me elasticsearch is working fine checking the index shard status, and recalculate the shard and translog buffer accordingly. If you think this analysis is incorrect or would like to contribute more information, please leave your comment below.

Friday, March 13, 2015

check out java wait, notify and notifyAll method

If you have been program java for a while already, you will most probably encounter method such as wait, notify and notifyAll. Example screenshot below.

java_object_notify

So today, we will take a look at these methods and what are them about and example code of usage of these methods. First, let's read the javadoc for these methods.

wait()
public final void wait()
throws InterruptedException

 

Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. In other words, this method behaves exactly as if it simply performs the call wait(0).

 

The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution.

 

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:

 

synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}

 

This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.

 

Throws:
IllegalMonitorStateException - if the current thread is not the owner of the object's monitor.
InterruptedException - if any thread interrupted the current thread before or while the current thread was waiting for a notification. The interrupted status of the current thread is cleared when this exception is thrown.

notify()
public final void notify()

 

Wakes up a single thread that is waiting on this object's monitor. If any threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation. A thread waits on an object's monitor by calling one of the wait methods.

 

The awakened thread will not be able to proceed until the current thread relinquishes the lock on this object. The awakened thread will compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for example, the awakened thread enjoys no reliable privilege or disadvantage in being the next thread to lock this object.

 

This method should only be called by a thread that is the owner of this object's monitor. A thread becomes the owner of the object's monitor in one of three ways:

 

By executing a synchronized instance method of that object.
By executing the body of a synchronized statement that synchronizes on the object.
For objects of type Class, by executing a synchronized static method of that class.

 

Only one thread at a time can own an object's monitor.

 

Throws:
IllegalMonitorStateException - if the current thread is not the owner of this object's monitor.

notifyAll()
public final void notifyAll()

 

Wakes up all threads that are waiting on this object's monitor. A thread waits on an object's monitor by calling one of the wait methods.

 

The awakened threads will not be able to proceed until the current thread relinquishes the lock on this object. The awakened threads will compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for example, the awakened threads enjoy no reliable privilege or disadvantage in being the next thread to lock this object.

 

This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.

 

Throws:
IllegalMonitorStateException - if the current thread is not the owner of this object's monitor.

Careful reader like you might notice that, all these methods belong to the class Object and remember that all object in java has Object class as their super class. So now this answer why these methods always exists on all objects.

Now, it is pretty clear that these method are used between two or more threads that interact with each other. You might ask when or why should I use these methods? If you have these codes, like thread sleep for a second (or less) and wait up to poll/check if the queue is fill. This is resource wasting like expensive cpu cycle. Then you should start to change the code to use these methods instead.

Okay, reasons given and now let's read into code example of these method usage. I have google and many given calculator example and I will also use this an my first example. >:)
public class Calculator {

public static void main(String[] args) {

ThreadB b = new ThreadB();
b.start();

synchronized (b) {
try {
System.out.println("Waiting for b to complete...");
b.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("total is : " + b.total);
}

}

static class ThreadB extends Thread {
int total;

@Override
public void run() {
System.out.println("calculate start");
synchronized(this) {
for (int i = 0 ; i < 100; i++) {
total += i;
}
notify();
}
System.out.println("calculate done");
}
}

}

output

Waiting for b to complete...
calculate start
calculate done
total is : 4950

As can be read above, we have a Calculator class. Then we have a static inner class to do a summation of 0 to 100. Noticed that a thread is started in the main method. object b is synchronized and thread b tell the main thread to wait until it is notify. In object b, we see that the thread goes through the loop to do the summation and once long running summation is executed, then thread b notify. Total is print in the main method and then thread b is done with the calculation. As for homework, try modify the codebase by removing synchronized keyword, wait and notify method, and run the application again, see the output. It will not be consistent and result will not be as expected.

Okay, let's go to a more complex example. Imagine a simple clinic where there are many patients waiting to be service by a doctor and there will be a nurse to handle the patient to the doctor one at a time. With that said, let's layout and read the code.
import java.util.Queue;
import java.util.Random;

public class Doctor extends Thread {

private Queue<String> patientQueue;

public Doctor(Queue<String> patientQueue) {
this.patientQueue = patientQueue;
}

@Override
public void run() {
try {
while (true) {
synchronized (patientQueue) {
if (patientQueue.size() == 0) {
patientQueue.wait();
}

String patient = patientQueue.remove();

System.out.println("treating patient: " + patient + " total patient in queue " + patientQueue.size());
Thread.sleep(duration(2, 3));
}

}
} catch (InterruptedException e) {
e.printStackTrace();
}
}


private static int duration(int min, int max) {

Random rand = new Random();

// nextInt is normally exclusive of the top value,
// so add 1 to make it inclusive
int randomNum = rand.nextInt((max - min) + 1) + min;
randomNum *= 1000;

return randomNum;
}

}

import java.util.Queue;

public class Nurse extends Thread {

static final int MAX = 10;
private int i;

private Queue<String> patientQueue;

public Nurse(Queue<String> patientQueue) {
this.patientQueue = patientQueue;
}


@Override
public void run() {
try {
while (true) {
patientAddedToQueue();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private synchronized void patientAddedToQueue() throws InterruptedException {
synchronized (patientQueue) {
while (patientQueue.size() == MAX) {
patientQueue.wait(10000);
System.out.println("done wait");
}
String patientName = "patient " + i;
patientQueue.add(patientName);
i++;
System.out.println("adding patient : " + patientName);
patientQueue.notify();

}

}

}

import java.util.LinkedList;
import java.util.Queue;

public class Clinics {

public static void main(String args[]) {

Queue<String> patientQueue = new LinkedList<String>();

Doctor veelan = new Doctor(patientQueue);
veelan.start();

Nurse gomathy = new Nurse(patientQueue);
gomathy.start();

}

}

Example output
adding patient : patient 0
adding patient : patient 1
adding patient : patient 2
adding patient : patient 3
adding patient : patient 4
adding patient : patient 5
adding patient : patient 6
adding patient : patient 7
adding patient : patient 8
adding patient : patient 9
treating patient: patient 0 total patient in queue 9
treating patient: patient 1 total patient in queue 8
treating patient: patient 2 total patient in queue 7
treating patient: patient 3 total patient in queue 6
treating patient: patient 4 total patient in queue 5
treating patient: patient 5 total patient in queue 4
treating patient: patient 6 total patient in queue 3
treating patient: patient 7 total patient in queue 2
treating patient: patient 8 total patient in queue 1
treating patient: patient 9 total patient in queue 0
done wait
adding patient : patient 10
adding patient : patient 11
adding patient : patient 12
adding patient : patient 13
adding patient : patient 14
adding patient : patient 15
adding patient : patient 16
adding patient : patient 17
adding patient : patient 18
adding patient : patient 19
treating patient: patient 10 total patient in queue 9
treating patient: patient 11 total patient in queue 8

As you can read above, we have three classes, Doctor, Nurse and Clinics. The entry point is in the class Clinics where this class bind the two other classes. A queue is share between class Doctor and Nurse class where a mock up example of nurse class will "accept" new patient with a private method patientAddedToQueue and if the queue is full at 10 patient, then a wait of 10seconds to tell the nurse object to stop accept new patient. Why 10 seconds of timeout, I will explain later. If the queue is less than 10, then a patient is added to the queue and method notify is called.

In the class Doctor, in the run method, we read that object patientQueue is synchronized. If the size is zero, there is no point to called remove method of patientQueue as it just make no sense. ;-) This doctor thread is sleep between 2 to 3 seconds to simulate a mock up example of when doctor is treating the patient.

Now, imagine if nurse gomathy has queue of size 10 and gomathy is on wait state, and meanwhile patientQueue is process by doctor veelan gradually, the queue came down slowly. But eventually the queue will become 0 as well. At this time, gomathy nurse object and veelan doctor object both in wait state. Hence, now you may figure out why 10 seconds of gomathy will start to receive more patients. :-) Probably this is another homework with better interaction between thread nurse and thread doctor for you reader. Try also add one more doctor and change notify to notifyAll.

Well that's it for this article. I hope you learn something and remember to contribute back.

Saturday, February 28, 2015

Implement java remote method invocation on tomcat6

28Today, we will learn a bit on remote method invocation (rmi) via java. I know this concept rmi is old but for the sake of learning, nothing is old :) fun and knowledge is what matter. First, let's see what is java remote method invocation. From wikipedia.
The Java Remote Method Invocation (Java RMI) is a Java API that performs the object-oriented equivalent of remote procedure calls (RPC), with support for direct transfer of serialized Java classes and distributed garbage collection.

 

The original implementation depends on Java Virtual Machine (JVM) class representation mechanisms and it thus only supports making calls from one JVM to another. The protocol underlying this Java-only implementation is known as Java Remote Method Protocol (JRMP).
In order to support code running in a non-JVM context, a CORBA version was later developed.

 

Usage of the term RMI may denote solely the programming interface or may signify both the API and JRMP, whereas the term RMI-IIOP (read: RMI over IIOP) denotes the RMI interface delegating most of the functionality to the supporting CORBA implementation.



and if you do not understand, looking one step up, java rmi is actually a java implementation of remote procedure call (rpc). Excerpts from wikipedia.
In computer science, a remote procedure call (RPC) is an inter-process communication that allows a computer program to cause a subroutine or procedure to execute in another address space (commonly on another computer on a shared network) without the programmer explicitly coding the details for this remote interaction.[1] That is, the programmer writes essentially the same code whether the subroutine is local to the executing program, or remote. When the software in question uses object-oriented principles, RPC is called remote invocation or remote method invocation.

Okay, enough of the theory, let's start a simple java rmi using tomcat. This learning tutorial assume you have tomcat server running and know basic how to deploy the jar file into your tomcat running server.

Provide access permission to the jar. Probably easiest if you are starting up to learn this and too much concept to grasp for, you start with grant permission for all security and when you are good at it, start to fine tune. You should set this in <TOMCAT_HOME>/conf/catalina.policy
  grant {
permission java.security.AllPermission;
};

Then now we will code at the server side. First, let's create a java interface which the server will implement this and the client will invoke this method remotely.
import java.rmi.Remote;

public interface CalculatorInterface extends Remote {

public final String serviceName = "MyRemoteService";

public Double Add(Double num1, Double num2) throws Exception;

public Double Sub(Double num1, Double num2) throws Exception;

public Double Mul(Double num1, Double num2) throws Exception;

public Double Div(Double num1, Double num2) throws Exception;

public Integer Factorial(Integer num) throws Exception;

public Float Random() throws Exception;
}

So we have an interface of Calculator which extends Remote interface. There are a few public method are exposed in this interface which will be invoke by client later.
public class Calculator implements CalculatorInterface {

public Calculator() {
super();
}

@Override
public Double Add(Double num1, Double num2) throws Exception {
return num1 + num2;
}

@Override
public Double Sub(Double num1, Double num2) throws Exception {
return num1 - num2;
}

@Override
public Double Mul(Double num1, Double num2) throws Exception {
return num1 * num2;
}

@Override
public Double Div(Double num1, Double num2) throws Exception {
return num1 / num2;
}

@Override
public Integer Factorial(Integer num) throws Exception {
Integer t = 1;
for(int i = 1; i <= num;i++){
t = t * i;
}
return t;
}

@Override
public Float Random() throws Exception {
return (float) Math.random();
}

}

Here, we implement the calculator. As seen here, all basic mathematics formulae. Now, we will start this instance in tomcat. The easy way would probably be implement servletContextListener and start the stub on a port when tomcat is starting. With that said, let's read the code below.
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class InitCalculator implements ServletContextListener {
public static boolean isRegistered = false;
public static CalculatorInterface service;

public InitCalculator() {
if (!isRegistered) {
try {
service = new Calculator();
CalculatorInterface stub = (CalculatorInterface)UnicastRemoteObject.exportObject(service, 0);
Registry registry = LocateRegistry.createRegistry(9345);
registry.rebind(CalculatorInterface.serviceName, stub);
System.out.println("Remote service bound");
isRegistered = true;
} catch (Exception e) {
System.err.println("Remote service exception:");
e.printStackTrace();
}
}

}

@Override
public void contextDestroyed(ServletContextEvent arg0) {
// TODO Auto-generated method stub

}

@Override
public void contextInitialized(ServletContextEvent arg0) {
new InitCalculator();
System.out.println("started ...");
}

}

As seen above, when webapp context is initialized, a new object InitCalculator() is created. This object is bind to port 9345, so make sure your firewall allow this as later you will need to access this port remotely. So we create a registray and bind it to the registry on port 9345. So very easy code. Remember to register this listener class into tomcat web descriptor.
  <listener>
<listener-class>com.example.InitCalculator</listener-class>
</listener>

Moving on to the last piece of puzzle, the client code.
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;

public class CalculatorClient {

public static void main(String[] args) {
try {
Registry registry = LocateRegistry.getRegistry("localhost", 9345);
String[] names = registry.list();
for (String name: names) {
System.out.println("~~~~~" + name + "~~~");
}
CalculatorInterface serv = (CalculatorInterface)registry.lookup(CalculatorInterface.serviceName);
System.out.println("add total " + serv.Add(1d, 1d));
} catch (Exception e) {
e.printStackTrace();
}
}

}

As can be read above, the client code connect to localhost on port 9345 and then list what's in the registry. Then the interface is created with registry lookup on the interface service name. Now, we can invoke the server method.. Pretty cool stuff. :) See below.
[user@localhost ~]$ java -cp /var/lib/tomcat/webapps/example/WEB-INF/lib/example.jar:. CalculatorClient
~~~~~MyRemoteService~~~
add total 2.0

That's it.

Friday, February 13, 2015

using google guava library to hold data for report

Often times when ones work with report (just a typical report), it is pretty common to meet the situation like to hold a list of rows into a data type which has a key and value and maybe a page number. So for java programmer, you will encounter something like this.
public class Report  {

List<LinkedHashMap<String, String>> rows = new ArrayList<LinkedHashMap<String, String>>();
private int page;

public static void printReport(Report report) {

List<LinkedHashMap<String, String>> oldReport = report.getOld();

for (LinkedHashMap<String, String> oldRows : oldReport) {
for (Entry<String, String> entry : oldRows.entrySet()) {
System.out.print(entry.getValue());
}
}
}

}

So you will have many rows to hold each row in a report and within each row, you have a key and a value. For instance, one the first page of report, you will have a person with first name john and last name doe and age 30. Then you have another row of person, first name dan, last name christensen, age 40, etc. Then to print the report, you basically iterate over the data collections and print out its value.

Is there any other ways, better yet efficient?

So I have google and people suggest using guava and I will take a look at the different feature offer by guava and how it help me in this situation above. So what is google guava?
The Google Guava is an open-source set of common libraries for Java, mainly developed by Google engineers.

This page give a general overview for the common libraries found in google guava. As you notice, there are many features included in this library but for the report above, I will use only two of it. Let's rewrite the above code.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import com.google.common.base.Joiner;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.LinkedHashMultiset;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;

public class Report {

private int page;
private List<String> header;
private Multimap<String, String> rows;
private Multiset<String> rowsSet;

private List<LinkedHashMap<String, String>> old = new ArrayList<LinkedHashMap<String, String>>();

public Report() {
page = 0;
header = new ArrayList<String>();
rows = LinkedHashMultimap.create();
rowsSet = LinkedHashMultiset.create();

}


public void getReportFromDS() {
header.addAll(Arrays.asList("firstName", "LastName", "age"));
rows.put("2829f395317df0f88597ef288f132827794707af", "john");
rows.put("2829f395317df0f88597ef288f132827794707af", "doe");
rows.put("2829f395317df0f88597ef288f132827794707af", "30");
rows.put("d94c2ddf2a4817e5c9a56db45d41ed876e823fcf", "dan");
rows.put("d94c2ddf2a4817e5c9a56db45d41ed876e823fcf", "christensen");
rows.put("d94c2ddf2a4817e5c9a56db45d41ed876e823fcf", "40");
rows.put("1fd23a55e9780810d2e6f0ec9ba1ddb99827e4cf", "chai");
rows.put("1fd23a55e9780810d2e6f0ec9ba1ddb99827e4cf", "lenny");
rows.put("1fd23a55e9780810d2e6f0ec9ba1ddb99827e4cf", "20");
}

public int getPage() {
return page;
}


public List<String> getHeader() {
return header;
}


public Multimap<String, String> getRows() {
return rows;
}

public List<LinkedHashMap<String, String>> getOld() {
return old;
}


public static void printReport(Report report) {
Joiner joiner = Joiner.on(", ");
String headers = joiner.join(report.getHeader());

System.out.println(headers);
Map<String, Collection<String>> rows = report.getRows().asMap();

for (Entry<String, Collection<String>> row : rows.entrySet()) {

String line = joiner.join(row.getValue().iterator());
System.out.println(line);
}

List<LinkedHashMap<String, String>> oldReport = report.getOld();

for (LinkedHashMap<String, String> oldRows : oldReport) {
for (Entry<String, String> entry : oldRows.entrySet()) {
System.out.print(entry.getValue());
}
}
}


public static void main(String[] args) {
Report sampleReport = new Report();
sampleReport.getReportFromDS();
printReport(sampleReport);
}

}

As noted from the full code above, it contain the constructor to initialize the objects. Then a method getReportFromDS(), you could probably get from your data source like database. Then we have getter methods and a static method to print the report. If you run this app, you notice it print out the report header, and then rows.

There is a class which join the string together with just two lines. Even better you can make it a line ;-) using Joinner. To print each row of sample report, you can using a for loop but only a for loop. Then you can join the value of the row and print out the row. Less codes and more readability. If you measure the object sampleReport, I guess is much use less memory footprint.

That's it, just two goodies features from google guava, I suggest you read on different features offered and fully use this great library.13

Sunday, January 18, 2015

how to improve apache cassandra 1.0.8 read speed

This article is for improve reading speed for apache cassandra 1.0.8. Because the reading improvement determined by many factors, we will investigate all possible areas so the gain will be improve collectively. So you may experience these factors and alter according to suit your node environment to achieve the best result. As the cassandra 1.0 released, the official cited that the read performance has increased up to 400%!

First and foremost, there are numerous articles which I use as a reference has cited copyright, I take no ownership nor credit of their hardwork as that is rightfully belong to them entirely. I only reference their work to improve my knowledge and to help people (like me) who need help and came to read what I share in my article.

Let's split these improvements into two parts, the hardware and the software.

hardware

ssd

ssd disk is way faster than hdd disk in term of reading in multiple magnitude, please read cassandra-benchmark for the benchmark. Although the cassandra using was version 0.8.10, but when cassandra 1.0 was released, read gained tremendous improvement. Then these two improvemetns will be linear gain too. Also, it is recommend to read the aforementioned article as it explain why is the random speed will hurt the read performance for hdd disk.

multiple disks

disks allocation to the commit log should be different than the data directory. Because during data write, data is repeating appending to the commit log. If the data directory is located on different drive, read performance gain should be visible.

 

software

memtable
If write behaviour has a lot of updates, it is good to look into memtable settings. There are two settings which you can start with

  • memtable_total_space_in_mb

  • commitlog_total_space_in_mb


more memory to this settings means the frequent write (update) will be absorded by the memory and thus, reading will be fast too as read start from memtable first before going into sstable. But because this impact system wide, you might want to gradually start to increase it and measure them. Read below for more information on what these two settings are and how to tune them
http://www.datastax.com/docs/1.0/operations/tuning#memtable-sizing
http://www.datastax.com/docs/1.0/configuration/node_configuration#memtable-total-space-in-mb
http://www.slideshare.net/driftx/cassandra-summit-2010-performance-tuning slide 14 and slide 26
http://wiki.apache.org/cassandra/MemtableThresholds

WP-DataStax-Cassandra page 16
Specifically, for read performance, Cassandra 1.0 optimizes queries by using a lighter-weight data structure for representing a row fragment from a read, than for a row fragment in a memtable into which updates accumulates. Also, with named reads, Cassandra 1.0 includes enhancements for deserializing the most recent versions of requested columns. Combined with the other optimizations, this makes reads in Cassandra as fast as writes for many workloads.

data compression

Previously I have done a study on the compression affect improvement read, read here, herehere and here. Please read the links as it provide comprehensive explanation than I could describe here.

compaction

compaction can improve (or impact) the read speed. Citation fromWP-DataStax-Cassandra,
The above process produces exceptionally fast write operations; however it also can lead to data fragmentation across the disk. Read requests may have to combine data from many SStables as well as Memtables to satisfy end user requests for data, and this can increase query response times.

To reduce data fragmentation and reclaim space taken by obsolete data, Cassandra performs "compactions" that merge the most recent data from many different SStables on disk into a new one.

So with my experience, if you trigger compaction (major) through nodetool, during compaction, the read latency will increase, thus impact but when the compaction is done, the read performance is improved.

In this documentation,  it explain different compaction strategy to use for read or write workload. So identify how's your environment write and read pattern and always measure it so you know what and when it could went wrong. Choose compaction strategy to suit your data model. For instance, if cassandra is not strong at a point, choose other big data technology. Read here for bad experience encountered.

sstables counts

Keep the sstables counts as low as possible for a column family. Excerpt from FULLTEXT02 page 39.
If a read operation is performed, initially the data are read from the memtable. If data are not in the memtable, then data get read from SSTable. Multiple SSTables may be looked up to find the data. Reading directly from SSTable decreases the performance because there are many SSTable that might need to be looked at hence requires an I/O operation means it requires touching the disk. Compared to SSTable, reading directly from memtable is fast because there is no I/O involved. The more the I/O operations are involved, the more performance will be degraded. Performance can also be increased by increasing the size of memtable [7].

Cassandra uses Bloom filter to judge quickly whether the key exists in the SSTable or not before touching the disk. Bloom filter is a efficient data structure that checks whether element is a member of a set by dividing the memory into buckets. Check each bucket to see if a key is present and if any bucket is empty then key was never inserted before. If there are many SSTables, then lots of I/O operations would be needed to read the data which can definitely decrease the performance. This is because of the fact that I/O operations are expensive and therefore compaction is used to improve read performance. Compaction merge two SSTables and sort to become one SSTable, which eventually decreases the number of SSTables and number of I/O operations, hence increasing the performance [7].

key / row cache

key cache should be enable to reduce the search from touching the disk, especially spinning disk. Excerpt from FULLTEXT02 page 47.
By default key cache is enabled and Cassandra caches 20,000 keys per Column Family (CF). The key cache decreases the Input/Output (I/O) operations because if key cache is not enabled then I/O operation is required in order to figure out the exact location of the row. Key cache holds the exact location of the data belonging to that key.

 

Row cache holds the entire content of the row in cache. By default, row cache is disabled. The overhead of enabling or increasing the row cache is that it may require more Java Virtual Machine (JVM) heap of Cassandra. By if jna lib is available, then storing row cache off heap is a good option. This article has diagram on how read is perform.

concurrent_read

Excerpt from FULLTEXT02 page 48.
Read performance can also be increased by tuning the concurrent reads. The rule is span 4 threads per Central Processing Units (CPU) core in the cluster. The higher the number of threads spanned for read, the higher performance can be achieved if the machines have got faster I/O.

A word of cautious, I tried increase concurrent_read from 32 to 64 and see some unpredictable behaviour, so it is better you do this in test environment.

decreasing read consistency level

If your business requirement can tolerate of eventual consistency, then decrease from quorum to one will improve read speed as only one node acknowledgement is sufficient to fulfill the read request compare to a certain amount of nodes in quorum.

turn off swap space

When the node start to swap due to shortage of memory, the response of node be it write or read will be visible. Hence it is best to turn off swap, and let the operating system kill or jvm kill itself to oom than the page swap start to happen.

java heap

Citation from OS-8.1.3-Cassandra Installation and Configuration Guide page 33
HEAP_NEWSIZE : Size of young generation. Larger value leads to longer GC pause times while smaller value will typically lead to more expensive GC. Set in conjunction with MAX_HEAP_SIZE.

So tune it carefully since this is pretty low level. Read this article as it mentioned a few garbage collector settings for cassandra and memory footprint.

upgrade

Each release of software improve or fix the previous defect, so is cassandra. If upgrade is viable, you should consider. For instance, to quote Aaron Morton
1.0 has key and row caches defined per CF, 1.1 has global ones which are better utilised and easier to manage. 1.2 moves bloom filters and compression meta off heap which reduces GC, which will help.  Things normally get faster.

This is also true.

monitoring

Because data load increase and/or decrease will impact the read response time, it is vital if there is monitoring services running. As cited from this paper, p1724_tilmannrabl_vldb2012 Page 1,
In modern enterprise systems it is not uncommon to have thousands of different metrics that are reported from a single host machine.

So monitor crucial metrics by cassandra, example, cpu, java heap and io should give some indicator if your speed has been reduced.

Whilst these are collected knowledge are from public and free will sharing. Any mistake and errors in this article is mine alone and does not reflect to them. Thank you and I hope you learned something.

Saturday, January 17, 2015

Investigate into Apache Cassandra Memtable updateLiveRatio logging output

Today, we are going to study apache cassandra 1.0.8 on memtable logging its statistics in the cassandra system.log. Example below
WARN [MemoryMeter:1] 2014-10-17 07:38:15,346 Memtable.java (line 176) setting live ratio to minimum of 1.0 instead of 0.16714977001091136
INFO [MemoryMeter:1] 2014-10-17 07:38:15,346 Memtable.java (line 186) CFS(Keyspace='OpsCenter', ColumnFamily='pdps') liveRatio is 1.6474114033116056 (just-counted was 1.0). calculation took 3ms for 595 columns

What does the above logging output means? Is cassandra instance operating normally since logging level is warn? Before we dive into the code to understand its meaning, let first read and understand what is memtable in cassandra context.

Excerpts from datastax documentation

Cassandra is optimized for write throughput. Cassandra writes are first written to a commit log (for durability), and then to an in-memory table structure called a memtable. A write is successful once it is written to the commit log and memory, so there is very minimal disk I/O at the time of write. Writes are batched in memory and periodically written to disk to a persistent table structure called an SSTable (sorted string table). Memtables and SSTables are maintained per column family. Memtables are organized in sorted order by row key and flushed to SSTables sequentially (no random seeking as in relational databases).

Whenever method updateRatio() from class Memtable is called, the following codes applies.
public void updateLiveRatio()
{
if (!MemoryMeter.isInitialized())
{
// hack for openjdk. we log a warning about this in the startup script too.
logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0. Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
cfs.liveRatio = 10.0;
return;
}

Runnable runnable = new Runnable()
{
public void run()
{
activelyMeasuring = Memtable.this;

long start = System.currentTimeMillis();
// ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
// So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
long deepSize = meter.measure(columnFamilies);
int objects = 0;
for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
{
deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
objects += entry.getValue().getColumnCount();
}
double newRatio = (double) deepSize / currentThroughput.get();

if (newRatio < MIN_SANE_LIVE_RATIO)
{
logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
newRatio = MIN_SANE_LIVE_RATIO;
}
if (newRatio > MAX_SANE_LIVE_RATIO)
{
logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio);
newRatio = MAX_SANE_LIVE_RATIO;
}
cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);

logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
activelyMeasuring = null;
}
};

try
{
meterExecutor.submit(runnable);
}
catch (RejectedExecutionException e)
{
logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
}
}

There is a new thread which will be execute by the executor meterExecutor. This executor if it is not busy, will start to measure this column family associated with this memtable. There are two essential metrics that is involve in the arithmetic, key size plus value size and the column count. deepSize is the summation of the column family , its key and its value. A new variable newRatio is calculated with the deepSize divided by the currentThroughput.

Valid newRatio range is between 1.0 to 64.0 inclusive. When calculated newRatio is less than 1.0, first line of log such as above will started to appear in cassandra system log and newRatio will be reset to 1.0. The same check when newRatio exceed 64.0, it will be logged and value for newRatio reset to maximum valid value of 64.0. Then column family live ratio is updated with whichever which is higher, the current or the new calculated newRatio.

So technically, this is nothing to really concern about. It's a measurement of the number of operations has increased as compare to the previous. Also, given the next throughput, as estimated of size of the memtable can be calculated based on the newRatio. As a side note, this method updateRatio() is called after a mutation has been applied to the memtable but before a flush is requested.

That's it for today, I hope you learned something.

Saturday, January 3, 2015

apache cassandra 1.0.8 IncompatibleClassChangeError vtable stub and AssertionError Added column does not sort as the last column

Today we will spend sometime to look into two errors and see if it is really something to concern about. The erros are thrown when apache cassandra version 1.0.8 is running. Okay, let's to the first error.
ERROR [ReadStage:1559] 2012-10-16 20:38:25,336 AbstractCassandraDaemon.java (line 139) Fatal exception in thread Thread[ReadStage:1559,5,main]
java.lang.IncompatibleClassChangeError: vtable stub
at org.apache.cassandra.db.AbstractColumnContainer.getColumn(AbstractColumnContainer.java:134)
at org.apache.cassandra.db.Memtable$6.computeNext(Memtable.java:402)
at org.apache.cassandra.db.Memtable$6.computeNext(Memtable.java:384)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:140)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:135)
at org.apache.cassandra.db.CollationController.collectTimeOrderedData(CollationController.java:93)
at org.apache.cassandra.db.CollationController.getTopLevelColumns(CollationController.java:62)
at org.apache.cassandra.db.ColumnFamilyStore.getTopLevelColumns(ColumnFamilyStore.java:1298)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1184)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1151)
at org.apache.cassandra.db.Table.getRow(Table.java:375)
at org.apache.cassandra.db.SliceByNamesReadCommand.getRow(SliceByNamesReadCommand.java:58)
at org.apache.cassandra.service.StorageProxy$LocalReadRunnable.runMayThrow(StorageProxy.java:765)
at org.apache.cassandra.service.StorageProxy$DroppableRunnable.run(StorageProxy.java:1224)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

I guess this is really some fatal errors encountered. Let's check out what is IncompatibleClassChangeError means. From IncompatibleClassChangeError javadoc

Thrown when an incompatible class change has occurred to some class definition. The definition of some class, on which the currently executing method depends, has since changed.

Honestly, this is really odd, this error thrown out of no where and the jvm stopped. I guess nobody change the compiled apache cassandra code during the node instance is running. It certainly sounds odd but I guess when the data hold per node is huge, then strange thing start to happened. Now, let's get into stack trace and follow stack trace path.

Bottom three stack traces are pretty obvious, a new thread was started and execute by the thread pool executor. Then we have class StorageProxy. Within this class, there is a static class LocalReadRunnable which implement the abstract method LocalReadRunnable. It seem like it is trying to read a local node table row. The table eventually make calls to the column family to retrieve column. Tracing even deeper, at line 134 of class AbstractColumnContainer,  reveal that no exception is thrown from here. This is like a mystery! :) The stack trace analysis and observed jvm stopped shown something is wrong. Though I am not sure what went wrong but if you have any idea, please discuss it as a comment below.

Next we look to another error.
ERROR [CompactionExecutor:4] 2014-10-22 06:13:00,884 AbstractCassandraDaemon.java (line 139) Fatal exception in thread Thread[CompactionExecutor:4,1,main]
java.lang.AssertionError: Added column does not sort as the last column
at org.apache.cassandra.db.ArrayBackedSortedColumns.addColumn(ArrayBackedSortedColumns.java:126)
at org.apache.cassandra.db.AbstractColumnContainer.addColumn(AbstractColumnContainer.java:129)
at org.apache.cassandra.db.AbstractColumnContainer.addColumn(AbstractColumnContainer.java:124)
at org.apache.cassandra.db.ColumnFamilySerializer.deserializeColumns(ColumnFamilySerializer.java:148)
at org.apache.cassandra.io.sstable.SSTableIdentityIterator.getColumnFamilyWithColumns(SSTableIdentityIterator.java:232)
at org.apache.cassandra.db.compaction.PrecompactedRow.merge(PrecompactedRow.java:110)
at org.apache.cassandra.db.compaction.PrecompactedRow.<init>(PrecompactedRow.java:97)
at org.apache.cassandra.db.compaction.CompactionController.getCompactedRow(CompactionController.java:137)
at org.apache.cassandra.db.compaction.CompactionIterable$Reducer.getReduced(CompactionIterable.java:102)
at org.apache.cassandra.db.compaction.CompactionIterable$Reducer.getReduced(CompactionIterable.java:87)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.consume(MergeIterator.java:118)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:101)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:140)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:135)
at com.google.common.collect.Iterators$7.computeNext(Iterators.java:614)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:140)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:135)
at org.apache.cassandra.db.compaction.CompactionTask.execute(CompactionTask.java:173)
at org.apache.cassandra.db.compaction.CompactionManager$1.call(CompactionManager.java:135)
at org.apache.cassandra.db.compaction.CompactionManager$1.call(CompactionManager.java:115)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Again, this error pop out of nowhere, during node operation, this exception just thrown. So now, we will look into what happened using the stack trace given. Similar to the above, bottom three stack traces are pretty obvious, a new thread was started and execute by the thread pool executor. It is a compaction thread and when compacting sstable, things break. Then row get reduced and columns get deserialized. When columns are re-formation, it throw an exception as the column does not sort. This is another strange phenomenon. It should have been sort before anyway but it is not. The javdoc for this method addColumn

AddColumn throws an exception if the column added does not sort after the last column in the map. The reasoning is that this implementation can get slower if too much insertions are done in unsorted order and right now we only use it when *all* insertion (with this method) are done in sorted order. The assertion throwing is thus a protection against performance regression without knowing about (we can revisit that decision later if we have use cases where most insert are in sorted order but a few are not).

It seem that this is probably reproducible during development environment as a few are not.. I guess this is not as fatal as previous error. That's it for this analysis, please give your inputs or comments if you have workaround.




 

UPDATE:

It seem that the second error is fix in https://issues.apache.org/jira/browse/CASSANDRA-5856

Friday, January 2, 2015

Android first application hello world

First Off, Happy New Year Everybody!

Perhaps there are ten or even hundreds of first application develop using android if you google. Well today, I will share mine too, hey, start of year, a hello world would be nice too. :) If you have programming skill in java and using eclipse IDE, then you come to this right place. In this article, I will use eclipse IDE and develop a hello world android application. I figured it was a painless few steps setup but it took me several days to get everything going due to my hectic life and problems encountered during this learning journey. I hope with these explain, you will have better and easier learning experience than I did.

This article assume you have read and had setup the Android Developer Toolkit plugin into eclipse IDE and Speed up android emulator startup in eclipse. If you have not, please refer to this link and this link respectively. Okay, let's get started as I will show project creation in steps with screenshots. I will document and describe my learning experience below but if you want to read further on certain topics, you can refer on the official documentation, then please refer to this link.

1. create a new android application project. If it is not listed in the tab File, try Other... ;)

eclipse-android-new-project

eclipse-android-application-project

2. Click Next button, it is a window to configure the new android application project. Here fill in the information which as such for this introduction course.

eclipse-android-new-application

3. Click Next button, yet another window to configure the project. The default settings will work mostly fine for this introductory course.

eclipse-android-new-application-configure

4. Click next button, this is a setting for the application icons. I'm not a color person but this colors design works like a charm for me. :-) if you want to take the challenge, try change the default icon.

eclipse-android-new-application-configure-icon

5. Click next button, the default Black Activity just fine to display the text hello world later.

eclipse-android-new-application-create-activity

6. Click next button, again, the default configuration work just fine. This is the last configuration.

eclipse-android-new-application-create-activity-1

7. Click finish button and now check your workspace, it should have created all the necessary files.

I remembered the first time I have setup, the tutorial never mentioned about installing android support libraries, so once Finish button was click, the project created with errors. This is a big of learning curve for something who is new to android. Let alone to say if you are new to java or even IDE. See screenshot below.

eclipse-android-androidManifest-missing

For your information, this project require a support library which is available in Android SDK manager. Okay, now, let's install android support library. On eclipse tab, click on Window and then Android SDK Manager. Find out where is the android support library and install it. See the screenshots below, it should give you sufficient of information to get this done.

eclipse-android-sdk-manager

 

eclipse-android-sdk-manager-accept-licence eclipse-android-sdk-manager-installing-support-packageeclipse-android-sdk-manager-done-support-packageBy now, eclipse will request you to restart, hence, restart your eclipse. Once restarted, you should see a new project known as appcompat_v7 exists in the Package Explorer window. If you project is still contain error, check if your project is build automatically. You can click on eclipse menu tab, Project then Build Automatically... is checked. The project should contain no error and if there is still error, in the eclipse, click on Problems tab and identify what is the error. Because error is dependent on your platform and version of eclipse and/or android sdk, hence, if you have problem, google and fix them. This is left for your exercises. If everything is okay, your project should be as similar as mine such as below.

appcompat_v7_myfirstapp eclipse-hello-world

Okay, now check the project properties make sure everything is okay.

eclipse-android-java-build-path eclipse-android-configuration

As you can notice above, I have also install Android 4.4W.2 sdk for my learning journey, but I have enabled Android 5.0 for this project. The reference project appcompat_v7 is valid and exists. Both of android projects are free from any errors. So far so good.

Note, there is a directory known as gen and educated guess said it is automatic generated files. As such, you should not modify directory to the file within this gen package but leave it automatically generated. Please note, R.java must autogenerated for this project to compiled successfully for our next journey and if you do not see this file, then you should be worry and find out where and what is the problem why R.java never get generated. Hint, don't try to add into an invalid R.java from somewhere but fix the root cause here.

Just before we launch the application in android emulator, let's see some android files here.

eclipse-android-style-xml eclipse-android-androidManifest

So all looking good, it should be ready now to launch this hello world android application in android emulator. Just like launching a simple java app, the sequence is pretty much the same for android. Right click on the project, then click on Run As / Debug As. Since this is the first android application, you might have not configured Android Virtual Device (avd), you should by now configure one. If you have done the previous article, Speed up android emulator startup in eclipse. It should be a breeze here and just select this avd. If not, you can follow the procedures using the screenshots below.

eclipse-debug-as-android-applicationeclipse-android-android-avd-erroreclipse-android-android-device-choosereclipse-android-avd-configurationeclipse-android-avd-configured-vdeclipse-android-start-avdeclipse-android-starting-avdeclipse-android-launching-app

Because the startup emulator is slow, I would suggest to leave the emulator running as long as you are learning to develop the application. Because starting the emulator in my workstation took minutes. As seen in the output below, I have configured two different avd and the output of one screenshot shown My First App is shown in the android menu. Cool!

eclipse-android-started-avd eclipse-android-avd-screen

Try click on My First App through the emulator, it should hello world. :-) That's it for this learning experience and I hope you learn something and if you want to contribute back, you can do so by donating back.

Thank you.