Saturday, July 29, 2017

Reading into Apache Cassandra AntiEntropyService

Today we will take a look at apache cassandra 1.2.19 AntiEntropyService class. First, let's get the source code of this class from github.

The class javadoc written this class, very well documented

 AntiEntropyService encapsulates "validating" (hashing) individual column families,
 exchanging MerkleTrees with remote nodes via a TreeRequest/Response conversation,
 and then triggering repairs for disagreeing ranges.
 Every Tree conversation has an 'initiator', where valid trees are sent after generation
 and where the local and remote tree will rendezvous in rendezvous(cf, endpoint, tree).
 Once the trees rendezvous, a Differencer is executed and the service can trigger repairs
 for disagreeing ranges.
 Tree comparison and repair triggering occur in the single threaded Stage.ANTIENTROPY.
 The steps taken to enact a repair are as follows:
 1. A major compaction is triggered via nodeprobe:
   Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
     receives a TreeRequest, it will perform a readonly compaction to immediately validate
     the column family.
 2. The compaction process validates the column family by:
   Calling Validator.prepare(), which samples the column family to determine key distribution,
   Calling Validator.add() in order for every row in the column family,
   Calling Validator.complete() to indicate that all rows have been added.
     Calling complete() indicates that a valid MerkleTree has been created for the column family.
     The valid tree is returned to the requesting node via a TreeResponse.
 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
    rendezvous with / compare to:
   If the tree is local, it is cached, and compared to any trees that were received from neighbors.
   If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
     the remote tree is stored until a local tree can be generated.
   A Differencer object is enqueued for each comparison.
 4. Differencers are executed in Stage.ANTIENTROPY, to compare the two trees, and perform repair via the streaming api.
That definitely a lot of operations involve in AntiEntropyService. Let's first identify all the classes

  • Validator
  • ValidatorSerializer
  • TreeRequestVerbHandler
  • TreeResponseVerbHandler
  • CFPair
  • TreeRequest
  • TreeRequestSerializer
  • RepairSession
  • RepairJob
  • Differencer
  • TreeResponse
  • RepairFuture
  • RequestCoordinator
  • Order
  • SequentialOrder
  • ParallelOrder

There are 16 classes in total and we can see that the classes is what the javadoc described above.

 AntiEntropyService is a singleton service with four status, started, session_success, session_failed and finished. An important method submitRepairSession  
   /**  
    * Requests repairs for the given table and column families, and blocks until all repairs have been completed.  
    *  
    * @return Future for asynchronous call or null if there is no need to repair  
    */  
   public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)  
   {  
     RepairSession session = new RepairSession(range, tablename, isSequential, isLocal, cfnames);  
     if (session.endpoints.isEmpty())  
       return null;  
     RepairFuture futureTask = session.getFuture();  
     executor.execute(futureTask);  
     return futureTask;  
   }  

where a new repair session is created and run by the executor. Another static method getNeighbors() where it gets neighbors that share the range.

   /**  
    * Return all of the neighbors with whom we share the provided range.  
    *  
    * @param table table to repair  
    * @param toRepair token to repair  
    * @param isLocal need to use only nodes from local datacenter  
    *  
    * @return neighbors with whom we share the provided range  
    */  
   static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)  
   {  
     StorageService ss = StorageService.instance;  
     Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);  
     Range<Token> rangeSuperSet = null;  
     for (Range<Token> range : ss.getLocalRanges(table))  
     {  
       if (range.contains(toRepair))  
       {  
         rangeSuperSet = range;  
         break;  
       }  
       else if (range.intersects(toRepair))  
       {  
         throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair");  
       }  
     }  
     if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))  
       return Collections.emptySet();  
     Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));  
     neighbors.remove(FBUtilities.getBroadcastAddress());  
     if (isLocal)  
     {  
       TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();  
       Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));  
       return Sets.intersection(neighbors, localEndpoints);  
     }  
     return neighbors;  
   }  

Next, a static Validator class implement Runnable interface has the following javadoc description

 A Strategy to handle building and validating a merkle tree for a column family.
 Lifecycle:
 1. prepare() - Initialize tree with samples.
 2. add() - 0 or more times, to add hashes to the tree.
 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
And the important three methods are

   public void prepare(ColumnFamilyStore cfs)  
     {  
       if (!tree.partitioner().preservesOrder())  
       {  
         // You can't beat an even tree distribution for md5  
         tree.init();  
       }  
       else  
       {  
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();  
         for (DecoratedKey sample : cfs.keySamples(request.range))  
         {  
           assert request.range.contains(sample.token): "Token " + sample.token + " is not within range " + request.range;  
           keys.add(sample);  
         }  
         if (keys.isEmpty())  
         {  
           // use an even tree distribution  
           tree.init();  
         }  
         else  
         {  
           int numkeys = keys.size();  
           Random random = new Random();  
           // sample the column family using random keys from the index  
           while (true)  
           {  
             DecoratedKey dk = keys.get(random.nextInt(numkeys));  
             if (!tree.split(dk.token))  
               break;  
           }  
         }  
       }  
       logger.debug("Prepared AEService tree of size " + tree.size() + " for " + request);  
       ranges = tree.invalids();  
     }  
     /**  
      * Called (in order) for every row present in the CF.  
      * Hashes the row, and adds it to the tree being built.  
      *  
      * There are four possible cases:  
      * 1. Token is greater than range.right (we haven't generated a range for it yet),  
      * 2. Token is less than/equal to range.left (the range was valid),  
      * 3. Token is contained in the range (the range is in progress),  
      * 4. No more invalid ranges exist.  
      *  
      * TODO: Because we only validate completely empty trees at the moment, we  
      * do not bother dealing with case 2 and case 4 should result in an error.  
      *  
      * Additionally, there is a special case for the minimum token, because  
      * although it sorts first, it is contained in the last possible range.  
      *  
      * @param row The row.  
      */  
     public void add(AbstractCompactedRow row)  
     {  
       assert request.range.contains(row.key.token) : row.key.token + " is not contained in " + request.range;  
       assert lastKey == null || lastKey.compareTo(row.key) < 0  
           : "row " + row.key + " received out of order wrt " + lastKey;  
       lastKey = row.key;  
       if (range == null)  
         range = ranges.next();  
       // generate new ranges as long as case 1 is true  
       while (!range.contains(row.key.token))  
       {  
         // add the empty hash, and move to the next range  
         range.addHash(EMPTY_ROW);  
         range = ranges.next();  
       }  
       // case 3 must be true: mix in the hashed row  
       range.addHash(rowHash(row));  
     }  
     private MerkleTree.RowHash rowHash(AbstractCompactedRow row)  
     {  
       validated++;  
       // MerkleTree uses XOR internally, so we want lots of output bits here  
       MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");  
       row.update(digest);  
       return new MerkleTree.RowHash(row.key.token, digest.digest());  
     }  
     /**  
      * Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.  
      */  
     public void complete()  
     {  
       completeTree();  
       StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);  
       logger.debug("Validated " + validated + " rows into AEService tree for " + request);  
     }  

Then we read that there is a inner static class ValidatorSerializer with two important methods, serialize and deserialize. Mainly serialize (or deserialize) tree request and merkle tree. The next two classes, TreeRequestVerbHandler and TreeResponseVerbHandler which is pretty trivial, both handling request and response from remote nodes.

Then another simple calss CFPair . Then another important class TreeRequest with method createMessage(). Same like ValidatorSerializer, TreeRequestSerializer also has two method serialize and deserialize.

The next class RepairSession which is pretty important, the javadoc written

Triggers repairs with all neighbors for the given table, cfs and range.
Typical lifecycle is: start() then join(). Executed in client threads.
Important method runMayThrow() describe how repair job is perform

     // we don't care about the return value but care about it throwing exception  
     public void runMayThrow() throws Exception  
     {  
       logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getName(), repairedNodes(), range, tablename, Arrays.toString(cfnames)));  
       if (endpoints.isEmpty())  
       {  
         differencingDone.signalAll();  
         logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getName(), range));  
         return;  
       }  
       // Checking all nodes are live  
       for (InetAddress endpoint : endpoints)  
       {  
         if (!FailureDetector.instance.isAlive(endpoint))  
         {  
           String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);  
           differencingDone.signalAll();  
           logger.error(String.format("[repair #%s] ", getName()) + message);  
           throw new IOException(message);  
         }  
         if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_11 && isSequential)  
         {  
           logger.info(String.format("[repair #%s] Cannot repair using snapshots as node %s is pre-1.1", getName(), endpoint));  
           return;  
         }  
       }  
       AntiEntropyService.instance.sessions.put(getName(), this);  
       Gossiper.instance.register(this);  
       FailureDetector.instance.registerFailureDetectionEventListener(this);  
       try  
       {  
         // Create and queue a RepairJob for each column family  
         for (String cfname : cfnames)  
         {  
           RepairJob job = new RepairJob(cfname);  
           jobs.offer(job);  
           activeJobs.put(cfname, job);  
         }  
         jobs.peek().sendTreeRequests();  
         // block whatever thread started this session until all requests have been returned:  
         // if this thread dies, the session will still complete in the background  
         completed.await();  
         if (exception == null)  
         {  
           logger.info(String.format("[repair #%s] session completed successfully", getName()));  
         }  
         else  
         {  
           logger.error(String.format("[repair #%s] session completed with the following error", getName()), exception);  
           throw exception;  
         }  
       }  
       catch (InterruptedException e)  
       {  
         throw new RuntimeException("Interrupted while waiting for repair.");  
       }  
       finally  
       {  
         // mark this session as terminated  
         terminate();  
         FailureDetector.instance.unregisterFailureDetectionEventListener(this);  
         Gossiper.instance.unregister(this);  
         AntiEntropyService.instance.sessions.remove(getName());  
       }  
     }  

The next two classes nested of RepairSession are RepairJob and Differencer. Which pretty much details to calculate the difference of trees and the preform a repair. The remaining tasks are pretty trivial.

No comments:

Post a Comment