Showing posts with label cassandra1.2.19. Show all posts
Showing posts with label cassandra1.2.19. Show all posts

Saturday, July 29, 2017

Reading into Apache Cassandra AntiEntropyService

Today we will take a look at apache cassandra 1.2.19 AntiEntropyService class. First, let's get the source code of this class from github.

The class javadoc written this class, very well documented

 AntiEntropyService encapsulates "validating" (hashing) individual column families,
 exchanging MerkleTrees with remote nodes via a TreeRequest/Response conversation,
 and then triggering repairs for disagreeing ranges.
 Every Tree conversation has an 'initiator', where valid trees are sent after generation
 and where the local and remote tree will rendezvous in rendezvous(cf, endpoint, tree).
 Once the trees rendezvous, a Differencer is executed and the service can trigger repairs
 for disagreeing ranges.
 Tree comparison and repair triggering occur in the single threaded Stage.ANTIENTROPY.
 The steps taken to enact a repair are as follows:
 1. A major compaction is triggered via nodeprobe:
   Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
     receives a TreeRequest, it will perform a readonly compaction to immediately validate
     the column family.
 2. The compaction process validates the column family by:
   Calling Validator.prepare(), which samples the column family to determine key distribution,
   Calling Validator.add() in order for every row in the column family,
   Calling Validator.complete() to indicate that all rows have been added.
     Calling complete() indicates that a valid MerkleTree has been created for the column family.
     The valid tree is returned to the requesting node via a TreeResponse.
 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
    rendezvous with / compare to:
   If the tree is local, it is cached, and compared to any trees that were received from neighbors.
   If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
     the remote tree is stored until a local tree can be generated.
   A Differencer object is enqueued for each comparison.
 4. Differencers are executed in Stage.ANTIENTROPY, to compare the two trees, and perform repair via the streaming api.
That definitely a lot of operations involve in AntiEntropyService. Let's first identify all the classes

  • Validator
  • ValidatorSerializer
  • TreeRequestVerbHandler
  • TreeResponseVerbHandler
  • CFPair
  • TreeRequest
  • TreeRequestSerializer
  • RepairSession
  • RepairJob
  • Differencer
  • TreeResponse
  • RepairFuture
  • RequestCoordinator
  • Order
  • SequentialOrder
  • ParallelOrder

There are 16 classes in total and we can see that the classes is what the javadoc described above.

 AntiEntropyService is a singleton service with four status, started, session_success, session_failed and finished. An important method submitRepairSession  
   /**  
    * Requests repairs for the given table and column families, and blocks until all repairs have been completed.  
    *  
    * @return Future for asynchronous call or null if there is no need to repair  
    */  
   public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)  
   {  
     RepairSession session = new RepairSession(range, tablename, isSequential, isLocal, cfnames);  
     if (session.endpoints.isEmpty())  
       return null;  
     RepairFuture futureTask = session.getFuture();  
     executor.execute(futureTask);  
     return futureTask;  
   }  

where a new repair session is created and run by the executor. Another static method getNeighbors() where it gets neighbors that share the range.

   /**  
    * Return all of the neighbors with whom we share the provided range.  
    *  
    * @param table table to repair  
    * @param toRepair token to repair  
    * @param isLocal need to use only nodes from local datacenter  
    *  
    * @return neighbors with whom we share the provided range  
    */  
   static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)  
   {  
     StorageService ss = StorageService.instance;  
     Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);  
     Range<Token> rangeSuperSet = null;  
     for (Range<Token> range : ss.getLocalRanges(table))  
     {  
       if (range.contains(toRepair))  
       {  
         rangeSuperSet = range;  
         break;  
       }  
       else if (range.intersects(toRepair))  
       {  
         throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair");  
       }  
     }  
     if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))  
       return Collections.emptySet();  
     Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));  
     neighbors.remove(FBUtilities.getBroadcastAddress());  
     if (isLocal)  
     {  
       TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();  
       Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));  
       return Sets.intersection(neighbors, localEndpoints);  
     }  
     return neighbors;  
   }  

Next, a static Validator class implement Runnable interface has the following javadoc description

 A Strategy to handle building and validating a merkle tree for a column family.
 Lifecycle:
 1. prepare() - Initialize tree with samples.
 2. add() - 0 or more times, to add hashes to the tree.
 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
And the important three methods are

   public void prepare(ColumnFamilyStore cfs)  
     {  
       if (!tree.partitioner().preservesOrder())  
       {  
         // You can't beat an even tree distribution for md5  
         tree.init();  
       }  
       else  
       {  
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();  
         for (DecoratedKey sample : cfs.keySamples(request.range))  
         {  
           assert request.range.contains(sample.token): "Token " + sample.token + " is not within range " + request.range;  
           keys.add(sample);  
         }  
         if (keys.isEmpty())  
         {  
           // use an even tree distribution  
           tree.init();  
         }  
         else  
         {  
           int numkeys = keys.size();  
           Random random = new Random();  
           // sample the column family using random keys from the index  
           while (true)  
           {  
             DecoratedKey dk = keys.get(random.nextInt(numkeys));  
             if (!tree.split(dk.token))  
               break;  
           }  
         }  
       }  
       logger.debug("Prepared AEService tree of size " + tree.size() + " for " + request);  
       ranges = tree.invalids();  
     }  
     /**  
      * Called (in order) for every row present in the CF.  
      * Hashes the row, and adds it to the tree being built.  
      *  
      * There are four possible cases:  
      * 1. Token is greater than range.right (we haven't generated a range for it yet),  
      * 2. Token is less than/equal to range.left (the range was valid),  
      * 3. Token is contained in the range (the range is in progress),  
      * 4. No more invalid ranges exist.  
      *  
      * TODO: Because we only validate completely empty trees at the moment, we  
      * do not bother dealing with case 2 and case 4 should result in an error.  
      *  
      * Additionally, there is a special case for the minimum token, because  
      * although it sorts first, it is contained in the last possible range.  
      *  
      * @param row The row.  
      */  
     public void add(AbstractCompactedRow row)  
     {  
       assert request.range.contains(row.key.token) : row.key.token + " is not contained in " + request.range;  
       assert lastKey == null || lastKey.compareTo(row.key) < 0  
           : "row " + row.key + " received out of order wrt " + lastKey;  
       lastKey = row.key;  
       if (range == null)  
         range = ranges.next();  
       // generate new ranges as long as case 1 is true  
       while (!range.contains(row.key.token))  
       {  
         // add the empty hash, and move to the next range  
         range.addHash(EMPTY_ROW);  
         range = ranges.next();  
       }  
       // case 3 must be true: mix in the hashed row  
       range.addHash(rowHash(row));  
     }  
     private MerkleTree.RowHash rowHash(AbstractCompactedRow row)  
     {  
       validated++;  
       // MerkleTree uses XOR internally, so we want lots of output bits here  
       MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");  
       row.update(digest);  
       return new MerkleTree.RowHash(row.key.token, digest.digest());  
     }  
     /**  
      * Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.  
      */  
     public void complete()  
     {  
       completeTree();  
       StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);  
       logger.debug("Validated " + validated + " rows into AEService tree for " + request);  
     }  

Then we read that there is a inner static class ValidatorSerializer with two important methods, serialize and deserialize. Mainly serialize (or deserialize) tree request and merkle tree. The next two classes, TreeRequestVerbHandler and TreeResponseVerbHandler which is pretty trivial, both handling request and response from remote nodes.

Then another simple calss CFPair . Then another important class TreeRequest with method createMessage(). Same like ValidatorSerializer, TreeRequestSerializer also has two method serialize and deserialize.

The next class RepairSession which is pretty important, the javadoc written

Triggers repairs with all neighbors for the given table, cfs and range.
Typical lifecycle is: start() then join(). Executed in client threads.
Important method runMayThrow() describe how repair job is perform

     // we don't care about the return value but care about it throwing exception  
     public void runMayThrow() throws Exception  
     {  
       logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getName(), repairedNodes(), range, tablename, Arrays.toString(cfnames)));  
       if (endpoints.isEmpty())  
       {  
         differencingDone.signalAll();  
         logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getName(), range));  
         return;  
       }  
       // Checking all nodes are live  
       for (InetAddress endpoint : endpoints)  
       {  
         if (!FailureDetector.instance.isAlive(endpoint))  
         {  
           String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);  
           differencingDone.signalAll();  
           logger.error(String.format("[repair #%s] ", getName()) + message);  
           throw new IOException(message);  
         }  
         if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_11 && isSequential)  
         {  
           logger.info(String.format("[repair #%s] Cannot repair using snapshots as node %s is pre-1.1", getName(), endpoint));  
           return;  
         }  
       }  
       AntiEntropyService.instance.sessions.put(getName(), this);  
       Gossiper.instance.register(this);  
       FailureDetector.instance.registerFailureDetectionEventListener(this);  
       try  
       {  
         // Create and queue a RepairJob for each column family  
         for (String cfname : cfnames)  
         {  
           RepairJob job = new RepairJob(cfname);  
           jobs.offer(job);  
           activeJobs.put(cfname, job);  
         }  
         jobs.peek().sendTreeRequests();  
         // block whatever thread started this session until all requests have been returned:  
         // if this thread dies, the session will still complete in the background  
         completed.await();  
         if (exception == null)  
         {  
           logger.info(String.format("[repair #%s] session completed successfully", getName()));  
         }  
         else  
         {  
           logger.error(String.format("[repair #%s] session completed with the following error", getName()), exception);  
           throw exception;  
         }  
       }  
       catch (InterruptedException e)  
       {  
         throw new RuntimeException("Interrupted while waiting for repair.");  
       }  
       finally  
       {  
         // mark this session as terminated  
         terminate();  
         FailureDetector.instance.unregisterFailureDetectionEventListener(this);  
         Gossiper.instance.unregister(this);  
         AntiEntropyService.instance.sessions.remove(getName());  
       }  
     }  

The next two classes nested of RepairSession are RepairJob and Differencer. Which pretty much details to calculate the difference of trees and the preform a repair. The remaining tasks are pretty trivial.

Friday, July 28, 2017

AbstractStreamSession convict stream error

Today we will take a look at the following error in apache cassandra 1.2.19

 ERROR [GossipStage:1] 2016-09-21 19:38:54,486 AbstractStreamSession.java (line 108) Stream failed because /1.2.3.4 died or was restarted/removed (streams may still be active in background, but further streams won't be started)  

I was not sure how serious is this but I've taken a look into this class to further determine the degree of seriousness of this error. Let's jump right into the class AbstractStreamSession.

  • it is an abstract class 
  • subclass of StreamInSession and StreamOutSession
  • when close(boolean) method is called, gossiper unregistered, and failure detector is also unregistered. 
In the method convict(InetAddress, double), snippet below:

   public void convict(InetAddress endpoint, double phi)  
   {  
     if (!endpoint.equals(getHost()))  
       return;  
   
     // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.  
     if (phi < 100 * DatabaseDescriptor.getPhiConvictThreshold())  
       return;  
   
     logger.error("Stream failed because {} died or was restarted/removed (streams may still be active "  
            + "in background, but further streams won't be started)", endpoint);  
     close(false);  
   }  

 
We see that the phi value is greater than 100 * DatabaseDescriptor.getPhiConvictThreshold() and thus, this is consider an error for the endpoint and a further close method with success as a false is called.

Guess this is not a serious problem as the close method is called and if the session is persist, a restart of cassandra should be sufficient to restream a new session.





Thursday, June 1, 2017

investigate into apache cassandra 1.2.19 sstable corrupt

Last, I have investigated in apache cassandra 1.0.8 sstable corruption a fresh node after upgraded to apache cassandra 1.2. Both of the articles can be found here and here. Today, we will take another look at a running apache cassandra 1.2.19 encounter sstable corruption. Below is the stack trace found in cassandra system.log

 org.apache.cassandra.io.sstable.CorruptSSTableException: org.apache.cassandra.io.compress.CorruptBlockException: (/var/lib/cassandra/data/<KEYSPACE>/<COLUMN_FAMILY>/<KEYSPACE>-<CF>-ic-112-Data.db): corruption detected, chunk at 19042661 of length 27265.  
     at org.apache.cassandra.io.compress.CompressedRandomAccessReader.reBuffer(CompressedRandomAccessReader.java:89)  
     at org.apache.cassandra.io.compress.CompressedThrottledReader.reBuffer(CompressedThrottledReader.java:45)  
     at org.apache.cassandra.io.util.RandomAccessReader.read(RandomAccessReader.java:355)  
     at java.io.RandomAccessFile.readFully(RandomAccessFile.java:444)  
     at java.io.RandomAccessFile.readFully(RandomAccessFile.java:424)  
     at org.apache.cassandra.io.util.RandomAccessReader.readBytes(RandomAccessReader.java:380)  
     at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:391)  
     at org.apache.cassandra.utils.ByteBufferUtil.readWithShortLength(ByteBufferUtil.java:370)  
     at org.apache.cassandra.io.sstable.SSTableScanner$KeyScanningIterator.next(SSTableScanner.java:175)  
     at org.apache.cassandra.io.sstable.SSTableScanner$KeyScanningIterator.next(SSTableScanner.java:155)  
     at org.apache.cassandra.io.sstable.SSTableScanner.next(SSTableScanner.java:142)  
     at org.apache.cassandra.io.sstable.SSTableScanner.next(SSTableScanner.java:38)  
     at org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:145)  
     at org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:122)  
     at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:96)  
     at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)  
     at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)  
     at org.apache.cassandra.db.compaction.CompactionTask.runWith(CompactionTask.java:145)  
     at org.apache.cassandra.io.util.DiskAwareRunnable.runMayThrow(DiskAwareRunnable.java:48)  
     at org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:28)  
     at org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:58)  
     at org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:60)  
     at org.apache.cassandra.db.compaction.CompactionManager$BackgroundCompactionTask.run(CompactionManager.java:208)  
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)  
     at java.util.concurrent.FutureTask.run(FutureTask.java:262)  
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
     at java.lang.Thread.run(Thread.java:745)  
 Caused by: org.apache.cassandra.io.compress.CorruptBlockException: (/var/lib/cassandra/data/<KEYSPACE>/<COLUMN_FAMILY>/<KEYSPACE>-<CF>-ic-112-Data.db): corruption detected, chunk at 19042661 of length 27265.  
     at org.apache.cassandra.io.compress.CompressedRandomAccessReader.decompressChunk(CompressedRandomAccessReader.java:128)  
     at org.apache.cassandra.io.compress.CompressedRandomAccessReader.reBuffer(CompressedRandomAccessReader.java:85)  
     ... 27 more  

Okay, let's trace into the stacktrace and study what actually cause this and what is sstable corruption means.

     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)  
     at java.util.concurrent.FutureTask.run(FutureTask.java:262)  
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
     at java.lang.Thread.run(Thread.java:745)      

Simple, a thread is run by the executor.

     at org.apache.cassandra.db.compaction.CompactionTask.runWith(CompactionTask.java:145)  
     at org.apache.cassandra.io.util.DiskAwareRunnable.runMayThrow(DiskAwareRunnable.java:48)  
     at org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:28)  
     at org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:58)  
     at org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:60)  
     at org.apache.cassandra.db.compaction.CompactionManager$BackgroundCompactionTask.run(CompactionManager.java:208)  

We see that a background compaction task is started. Code below.

   // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables  
   // are created between task submission and execution, we execute against the most up-to-date information  
   class BackgroundCompactionTask implements Runnable  
   {  
     private final ColumnFamilyStore cfs;  
   
     BackgroundCompactionTask(ColumnFamilyStore cfs)  
     {  
       this.cfs = cfs;  
     }  
   
     public void run()  
     {  
       compactionLock.readLock().lock();  
       try  
       {  
         logger.debug("Checking {}.{}", cfs.table.name, cfs.columnFamily); // log after we get the lock so we can see delays from that if any  
         if (!cfs.isValid())  
         {  
           logger.debug("Aborting compaction for dropped CF");  
           return;  
         }  
   
         AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();  
         AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs));  
         if (task == null)  
         {  
           logger.debug("No tasks available");  
           return;  
         }  
         task.execute(metrics);  
       }  
       finally  
       {  
         compactingCF.remove(cfs);  
         compactionLock.readLock().unlock();  
       }  
       submitBackground(cfs);  
     }  
   }  

     at org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:145)  
     at org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:122)  
     at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:96)  
     at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)  
     at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)  

Here we see an iterator going over the sstables for compaction.

     at org.apache.cassandra.io.sstable.SSTableScanner$KeyScanningIterator.next(SSTableScanner.java:175)  
     at org.apache.cassandra.io.sstable.SSTableScanner$KeyScanningIterator.next(SSTableScanner.java:155)  
     at org.apache.cassandra.io.sstable.SSTableScanner.next(SSTableScanner.java:142)  
     at org.apache.cassandra.io.sstable.SSTableScanner.next(SSTableScanner.java:38)  

DecoratedKey key = sstable.decodeKey(ByteBufferUtil.readWithShortLength(dfile));
     
Here we see that what actually get iterated is the sstable and particular on the key.

     at org.apache.cassandra.io.compress.CompressedRandomAccessReader.reBuffer(CompressedRandomAccessReader.java:89)  
     at org.apache.cassandra.io.compress.CompressedThrottledReader.reBuffer(CompressedThrottledReader.java:45)  
     at org.apache.cassandra.io.util.RandomAccessReader.read(RandomAccessReader.java:355)  
     at java.io.RandomAccessFile.readFully(RandomAccessFile.java:444)  
     at java.io.RandomAccessFile.readFully(RandomAccessFile.java:424)  
     at org.apache.cassandra.io.util.RandomAccessReader.readBytes(RandomAccessReader.java:380)  
     at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:391)  
     at org.apache.cassandra.utils.ByteBufferUtil.readWithShortLength(ByteBufferUtil.java:370)  

Here, there is code reference has change in a few files due to method override, nonetheless, the important part on method reBuffer.


   @Override  
   protected void reBuffer()  
   {  
     try  
     {  
       decompressChunk(metadata.chunkFor(current));  
     }  
     catch (CorruptBlockException e)  
     {  
       throw new CorruptSSTableException(e, getPath());  
     }  
     catch (IOException e)  
     {  
       throw new FSReadError(e, getPath());  
     }  
   }  
     
   private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException  
   {  
     if (channel.position() != chunk.offset)  
       channel.position(chunk.offset);  
   
     if (compressed.capacity() < chunk.length)  
       compressed = ByteBuffer.wrap(new byte[chunk.length]);  
     else  
       compressed.clear();  
     compressed.limit(chunk.length);  
   
     if (channel.read(compressed) != chunk.length)  
       throw new CorruptBlockException(getPath(), chunk);  
   
     // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes  
     // in the future this will save a lot of hair-pulling  
     compressed.flip();  
     try  
     {  
       validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);  
     }  
     catch (IOException e)  
     {  
       throw new CorruptBlockException(getPath(), chunk);  
     }  
   
     if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())  
     {  
       checksum.update(buffer, 0, validBufferBytes);  
   
       if (checksum(chunk) != (int) checksum.getValue())  
         throw new CorruptBlockException(getPath(), chunk);  
   
       // reset checksum object back to the original (blank) state  
       checksum.reset();  
     }  
   
     // buffer offset is always aligned  
     bufferOffset = current & ~(buffer.length - 1);  
   }  

we read that if chunk checksum is not the same as the updated crc32 checksum, this is consider sstable corruption.

For this type of exception, I remember I did many things such as below.

1. try online nodetool scrub, does not work
2. try offline sstablescrub, does not work.
3. wipeout the node and rebuild again, does not work.

we had to change the hardware altogether. Then we don't see the problem anymore.

Friday, July 1, 2016

Yet another sstable corruption - EOFException

During starting up a apache cassandra 1.2 instance, I noticed in the log of the following error.

 INFO 10:38:23,334 Opening /var/lib/cassandra/data/MYKEYSPACE/MYCOLUMNFAMILY/MYKEYSPACE-COLUMNFAMILY-hf-2508 (2275767 bytes)  
 ERROR 10:38:23,467 Exception in thread Thread[SSTableBatchOpen:2,5,RMI Runtime]  
 org.apache.cassandra.io.sstable.CorruptSSTableException: java.io.EOFException  
    at org.apache.cassandra.io.compress.CompressionMetadata.<init>(CompressionMetadata.java:108)  
    at org.apache.cassandra.io.compress.CompressionMetadata.create(CompressionMetadata.java:63)  
    at org.apache.cassandra.io.util.CompressedPoolingSegmentedFile$Builder.complete(CompressedPoolingSegmentedFile.java:42)  
    at org.apache.cassandra.io.sstable.SSTableReader.load(SSTableReader.java:418)  
    at org.apache.cassandra.io.sstable.SSTableReader.open(SSTableReader.java:209)  
    at org.apache.cassandra.io.sstable.SSTableReader.open(SSTableReader.java:157)  
    at org.apache.cassandra.io.sstable.SSTableReader$1.run(SSTableReader.java:273)  
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)  
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
    at java.lang.Thread.run(Thread.java:745)  
 Caused by: java.io.EOFException  
    at java.io.DataInputStream.readUnsignedShort(DataInputStream.java:340)  
    at java.io.DataInputStream.readUTF(DataInputStream.java:589)  
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)  
    at org.apache.cassandra.io.compress.CompressionMetadata<init>(CompressionMetadata.java:83)  
     ... 11 more  

Yes, if you noticed that the cassandra sstable version is hf which belong to cassandra 1.1 as this node is just right after cassandra 1.2 upgrade and first cassandra 1.2 boot up.

Tracing the stacktrace above with cassandra 1.2 source code, it turn out to be the compression metadata cannot be open due to file corruption. I tried using nodetools upgradesstables, scrub and restart cassandra instance, this error still persist. I guess in this case, nothing can really help so I end up stopping the cassandra instance. remove this data sstables together with its metadata sstables and then start it up again. The error is gone and I ran a repair.

I hope you find this useful in your situation too.

Sunday, June 19, 2016

Investigating into apache cassandra 1.2 jmx metrics connection type warn logging

Recently I got the opportunity to upgrade a production cassandra cluster from 1.1.12 to 1.2.19 and during the midst of upgrading, I noticed the following in the log file during boot up of a cassandra 1.2 instance.

1:  WARN 10:30:14,987 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=Timeouts  
2:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=Timeouts  
3:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
4:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
5:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
6:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
7:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
8:     at com.yammer.metrics.reporting.JmxReporter.processMeter(JmxReporter.java:412)  
9:     at com.yammer.metrics.reporting.JmxReporter.processMeter(JmxReporter.java:16)  
10:     at com.yammer.metrics.core.Meter.processWith(Meter.java:131)  
11:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
12:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
13:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
14:     at com.yammer.metrics.core.MetricsRegistry.newMeter(MetricsRegistry.java:240)  
15:     at com.yammer.metrics.Metrics.newMeter(Metrics.java:245)  
16:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:106)  
17:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
18:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
19:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
20:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
21:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
22:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
23:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
24:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
25:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
26:     at java.lang.Thread.run(Thread.java:745)  

As the logging level is WARN, I did not worry that much. Going into the codes, it turn out that in cassandra 1.2 , a metric known as ConnectionMetrics is added. This metric is under domain org.apache.cassandra.metrics and of type connection and name is Timeouts. This is not available in cassandra 1.1.

The same situation is applicable to CommandPendingTasks, ResponseCompletedTasks, ResponsePendingTasks, CommandCompletedTasks.


1:  WARN 10:38:58,079 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandPendingTasks  
2:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandPendingTasks  
3:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
4:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
5:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
6:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
7:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
8:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
9:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
10:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
11:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
12:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
13:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
14:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
15:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
16:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:71)  
17:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
18:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
19:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
20:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
21:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
22:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
23:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
24:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
25:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
26:     at java.lang.Thread.run(Thread.java:745)  
27:       
28:   WARN 07:52:19,882 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponseCompletedTasks  
29:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponseCompletedTasks  
30:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
31:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
32:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
33:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
34:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
35:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
36:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
37:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
38:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
39:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
40:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
41:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
42:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
43:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:99)  
44:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
45:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
46:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
47:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
48:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
49:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
50:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
51:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
52:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
53:     at java.lang.Thread.run(Thread.java:745)  
54:       
55:   WARN 09:06:07,059 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponsePendingTasks  
56:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=ResponsePendingTasks  
57:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
58:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
59:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
60:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
61:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
62:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
63:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
64:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
65:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
66:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
67:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
68:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
69:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
70:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:92)  
71:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
72:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
73:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
74:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
75:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
76:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
77:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
78:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
79:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
80:     at java.lang.Thread.run(Thread.java:745)     
81:    
82:    
83:   WARN 02:13:09,861 Error processing org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandCompletedTasks  
84:  javax.management.InstanceNotFoundException: org.apache.cassandra.metrics:type=Connection,scope=1.2.3.4,name=CommandCompletedTasks  
85:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1095)  
86:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.exclusiveUnregisterMBean(DefaultMBeanServerInterceptor.java:427)  
87:     at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.unregisterMBean(DefaultMBeanServerInterceptor.java:415)  
88:     at com.sun.jmx.mbeanserver.JmxMBeanServer.unregisterMBean(JmxMBeanServer.java:546)  
89:     at com.yammer.metrics.reporting.JmxReporter.registerBean(JmxReporter.java:462)  
90:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:438)  
91:     at com.yammer.metrics.reporting.JmxReporter.processGauge(JmxReporter.java:16)  
92:     at com.yammer.metrics.core.Gauge.processWith(Gauge.java:28)  
93:     at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)  
94:     at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)  
95:     at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)  
96:     at com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)  
97:     at com.yammer.metrics.Metrics.newGauge(Metrics.java:70)  
98:     at org.apache.cassandra.metrics.ConnectionMetrics.<init>(ConnectionMetrics.java:78)  
99:     at org.apache.cassandra.net.OutboundTcpConnectionPool.<init>(OutboundTcpConnectionPool.java:53)  
100:     at org.apache.cassandra.net.MessagingService.getConnectionPool(MessagingService.java:493)  
101:     at org.apache.cassandra.net.MessagingService.getConnection(MessagingService.java:507)  
102:     at org.apache.cassandra.net.MessagingService.sendOneWay(MessagingService.java:640)  
103:     at org.apache.cassandra.net.MessagingService.sendReply(MessagingService.java:614)  
104:     at org.apache.cassandra.db.RowMutationVerbHandler.doVerb(RowMutationVerbHandler.java:59)  
105:     at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)  
106:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)  
107:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)  
108:     at java.lang.Thread.run(Thread.java:745)  
This is indeed nothing to worry about. When you done upgrading all your nodes in the cluster, do another round of restart, this type of warning logging will disappear.