Monday, November 6, 2017

become a part of atlas ripe community

One of the primary objective is to give back what we have learn from the world and in this article, I am doing exactly that. Recently a good friend of mine introduce me to atlas ripe community where to join as a member and host a probe for the benefit of better and real time worldwide networking troubleshooting.

At first I was puzzled how does it work and why should I apply to host a probe. After a demo, it looks like this User Define Measurement or UDM will help my work and so I was convinced. It shown a report of network connectivity from ping to ssl certificate checks from the probes worldwide.

So I applied and you can too! It can be apply here. After sometime I thought my application was rejected because I have not get any response from the atlas ripe community. It was like 1-2weeks after application. But on 17 october 2017, I got the email from ripe community that they shipped the unit! I was excited but it took sometime to reach Malaysia as the parcel travel from Netherlands.

On 31 october 2017, I received the parcel in my mailbox! Take a look below


It was really easy after that, the probe ID is label as a sticker on the prob and once registered to the site, you are ready to plug the prob to the network. It was hassle less, once plug into the router network interface and this unit is usb powered, it took no time to detected by the ripe atlas site.

You can check the probe status here. If you probe is up and service user defined measurement from other users requests, you start to earn credits. This credit can be use for your own user defined measurement! On second day of hosting, I got 538k of credit which is really cool.

If you are in system or network admin, I think this will help you to troubleshoot if you have to measure connectivity from network devices worldwide.

Sunday, November 5, 2017

Going IPv6-only - Gamers, don't do this at home!

Recently I've attended a talk about Cisco's IPv6-only campus building in San Jose . While their internal network is IPv6 only they are still able to talk to IPv4 hosts using NAT64. This motivated me to try this out at home.

Current setup

I'm already running a nicely working dual-stack setup. My ISP assigns me one semi-static IPv4 (officially it's dynamic but it never actually changes) and a static generous /48 over DHCPv6-PD. Internally I have a bunch of DHCP/DHCPv6/SLAAC client devices and two servers hosting a few VMs with static IPv4 and IPv6 addresses.

Goals



In this experiment, I want to disable IPv4 connectivity for my client devices. For target hosts only accessible over IPv4 I will set up a DNS64 / NAT64 environment. I want to find out how much my usual activities are affected, for example browsing, checking email and gaming.


Requirements



  • If something breaks horribly, I want to be able to go back easily and quickly.
  • I only want to test the impact on client devices ( "end user experience" ) - my infrastructure hosts should still be able to communicate over IPv4 where needed.


The plan


  • Set up NAT64 in a VM
  • Set up DNS64
  • Disable DHCP v4 and release all IPv4 addresses on my clients. . I'm not going to actually disable their IPv4 stack, I don't care if windows does automatic IPv4 shenanigans on the local network.


NAT64 setup



First, I've created a new virtual machine on my KVM host. I installed a standard Centos 7 ("Infrastructure Server"). For the actual NAT64 translation I decided to install Jool . There are alternatives around but this seemed to be the most current one.


There are no packages for Centos available, but the installation is still pretty simple:


Prerequisites:

yum groupinstall "Development Tools"
yum install epel-release
yum install dkms libnl3-devel kernel-devel
wget https://github.com/NICMx/releases/raw/master/Jool/Jool-3.5.4.zip
unzip Jool-3.5.4.zip


Build the kernel module:

dkms install Jool-3.5.4


Build the userspace application:


cd Jool-3.5.4
autoconf.sh
./configure
make
make install


Then we can start the translation. For this I wrote a simple script:


cat nat64.sh
# enable routing
sysctl -w net.ipv4.conf.all.forwarding=1
sysctl -w net.ipv6.conf.all.forwarding=1


# disable offloading - see https://www.jool.mx/en/offloads.html
ethtool --offload eth0 gro off


# assign  64:ff9b::/96
/sbin/ip address add 64:ff9b::/96 dev eth0


# start jool
/sbin/modprobe jool pool6=64:ff9b::/96


# enable logging
jool --logging-bib=true
jool --logging-session=true


Two things to note:
  •  I've assigned the standard range 64:ff9b::/96 to the NAT64 box - this is suggested and required if you plan on using for example the google DNS64 instead of your own. If you only roll your own DNS64 then you could use a different range here
  • The script above disables offloading in the VM - but it also needs to be done on the VM host. I didn't realise this at first and it resulted in horrible performance. I should have read the FAQ first …



Finally, once jool is running we also set up a route to this range. I probably could tinker around with radvd on this box to announce the range directly, but it seemed easier to just set up a static route on my gateway(ubnt Edgerouter ), and this worked fine.


set protocols static route6 64:ff9b::/96 next-hop 2a02:999:1337:23::50


Now we should be able to reach IPv4 targets over IPv6 internally. You can simply test this by concatenating the IPv6 prefix above with the IPv4 address:


ping 64:ff9b::8.8.8.8
PING 64:ff9b::8.8.8.8(64:ff9b::808:808) 56 data bytes
64 bytes from 64:ff9b::808:808: icmp_seq=1 ttl=57 time=1.18 ms
64 bytes from 64:ff9b::808:808: icmp_seq=2 ttl=57 time=0.996 ms


DNS64 setup



Now that we have a working NAT64 gateway, we also need to tell the IPv6 client when to actually use it. The principle is simple: The client asks our DNS64 Resolver for the AAAA record of its target and our resolver will pass this query on. If it gets a positive answer it will pass it back to the client - the target is reachable with IPv6 directly and we don't need to involve NAT64. But if the server responds with NODATA our resolver will synthesise AAAA records itself based on the target's A records. The synthesised AAAA records point to the NAT64 IP range defined earlier.


For example, google.com has AAAA records, these will be returned as-is. But 'nzz.ch' does not. In this case the resolver gets the A record of nzz.ch (194.40.217.50) and builds the AAAA record 64:ff9b::c228:d932 (which corresponds to the  4-in-6 notation 64::ff9b::194.40.217.50)


Google provides such a DNS64 service on the addresses  2001:4860:4860::6464 and 2001:4860:4860::64


Currently my clients are configured to point to a local dnsdist -  a very flexible DNS load balancer. While it's admittedly a bit of an overkill to have a load balancer in my LAN, dnsdist makes experiments like these super easy, because it allows me to  simply switch between standard and DNS64 backends or different DNS64 implementations without the need to reconfigure any of my clients. they will always just see the dnsdist IP as resolver, which they got from SLAAC  ( radvd-options "RDNSS 2a02:999:1337:23::88 {};" ). dnsdist also provides nice real-time graphs and inspection possibilities.



Behind my dnsdist I have a local PowerDNS Recursor  which we will now configure to do DNS64.


We copy the example lua config from the documentation and adapt it to use our 64:ff9b::/96 range. So our dns64.lua file looks like this:


-- this small script implements dns64 without any specials or customization
prefix = "64:ff9b::"


function nodata ( dq )
 if dq.qtype ~= pdns.AAAA then
   return false
 end  --  only AAAA records


 -- don't fake AAAA records if DNSSEC validation failed
 if dq.validationState == pdns.validationstates.Bogus then
    return false
 end


 dq.followupFunction = "getFakeAAAARecords"
 dq.followupPrefix = prefix
 dq.followupName = dq.qname
 return true
end


-- the ip6.arpa address is the reverse of the prefix address above
function preresolve ( dq )
 if dq.qtype == pdns.PTR and dq.qname:isPartOf(newDN("b.9.f.f.4.6.0.0.ip6.arpa")) then
   dq.followupFunction = "getFakePTRRecords"
   dq.followupPrefix = prefix
   dq.followupName = dq.qname
   return true
 end
 return false
end


We save the script in /etc/pdns-recursor/dns64.lua and then activate it in /etc/pdns-recursor/recursor.conf:


lua-dns-script=/etc/pdns-recursor/dns64.lua


Now we're ready for prime time and should be able to resolve IPv4-only targets. Let's test (from any box in my lan):


dig aaaa nzz.ch +short
64:ff9b::c228:d932


works!


Just to make sure, we want to test if IPv6 enabled targets still resolve correctly. They should *not* be rewritten to our 64:ff9b::/96 prefix!


dig aaaa google.ch +short
2a00:1450:400a:807::2003


all good!

Go live

To force the clients to use IPv6 I'm simply disabling the DHCPv4 server on my gateway and release the V4 address (on windows: ipconfig /release or disable IPv4 in the adapter settings ).


To make sure they don't reach anything over IPv4 anymore:





At the same time I open a console on my NAT64 box and tail the logs to see what traffic gets NAT'ed


Nov 03 14:33:01 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:1 (GMT) - Added session 2a02:999:1337:23::100#2090|64:ff9b::c228:d932#2090|192.168.23.50#64627|194.40.217.50#64627|ICMP
Nov 03 14:33:01 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:1 (GMT) - Mapped 2a02:999:1337:23::100#2090 to 192.168.23.50#64627 (ICMP)
Nov 03 14:33:05 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:5 (GMT) - Forgot session 2a02:999:1337:1337:3c75:4fdc:8b1e:64c#53261|64:ff9b::57ec:c857#443|192.168.23.50#64235|87.236.200.87#443|TCP
Nov 03 14:33:05 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:5 (GMT) - Forgot 2a02:999:1337:1337:3c75:4fdc:8b1e:64c#53261 to 192.168.23.50#64235 (TCP)
Nov 03 14:33:05 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:5 (GMT) - Forgot session 2a02:999:1337:1337:3c75:4fdc:8b1e:64c#53259|64:ff9b::57ec:c857#443|192.168.23.50#64231|87.236.200.87#443|TCP
Nov 03 14:33:05 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:5 (GMT) - Forgot 2a02:999:1337:1337:3c75:4fdc:8b1e:64c#53259 to 192.168.23.50#64231 (TCP)
Nov 03 14:33:05 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:5 (GMT) - Forgot session 2a02:999:1337:1337:3c75:4fdc:8b1e:64c#53260|64:ff9b::57ec:c857#443|192.168.23.50#64234|87.236.200.87#443|TCP
Nov 03 14:33:05 nat64 kernel: NAT64 Jool: 2017/11/3 13:33:5 (GMT) - Forgot 2a02:999:1337:1337:3c75:4fdc:8b1e:64c#53260 to 192.168.23.50#64234 (TCP)


All is fine...

I start browsing, reading mail… and you know what? Everything just works(™). As mentioned earlier, in my first attempt the performance was horrible but after disabling offloads on my VM host this problem is gone. Browsing is fast and I don't notice any difference between IPv6 and IPv4-only websites. I'm testing video streaming sites as well, no issues. My roomie tries out her Office VPN, citrix, Skype calls , again, no issues there even though stuff get's NATed.


The only thing I notice is that I can't log in to my router web GUI over IPv6 ("Unable to load router configuration")  - but this is a internal Problem in my LAN and would be fixable as well.


… until you want to play a game



Oh boy. Before I started the experiment I imagined that there might be some issues with games. But it's even worse than I thought.  First of all, GeForce Experience tells me that there is a new driver available. But it just can't download it ("Unable to connect to NVIDIA"). Well, no surprise there, this NVIDIA piece of s...oftware hasn't been a shining knight of bug freeness anyway. I can still download the drivers from the website at least.


Let's start Steam.






So.. yeah, that doesn't look so great.  Offline mode it is. Quick google search shows this bug has been reported 4 years ago already  (DNS people: check by whom ;-) ). The report is for steam on Linux, but Windows has the same issue.

The Ubisoft Launcher is not better. ("A Ubisoft service is not available at the moment")
) Again, I can start Assassins Creed Origins in offline mode, so there's at least that.

How about Blizzard? The battle.net client starts fine, but can't update games. Overwatch does not even start ("unable to locate resources") , Hearthstone makes it at least to the main menu but you can't enter a game.



The Epic games launcher started fine the first time and Unreal Tournament can be fired up as well. It doesn't find any online games though. I re-enabled IPv4 quickly to test if it finds games then (it does), and disabled IPv4 again. After that, the Epic Launcher showed an Error. A little later it worked again.





The Origin Client sometimes works and sometimes does not ("You are offline"). Battlefield 1 can be started, but only the offline campaign is available.


At that point I gave up - IPv6 only and gaming do not match (yet). Well, at least I can do some backseat gaming on twitch.tv (works fine in the browser but seems to have problems displaying ads in the desktop app - which would be nice, but it also thinks the ad is showing and it mutes the stream for eternity.)



Conclusion



If it weren't for my addiction to occasionally harassing pixels , going IPv6 only in my network would be no problem. NAT64 and DNS64 works fine and is pretty easy to set up (assuming there is an existing dual stack setup).


Dear game developers: You need to act now and start supporting IPv6. Forums are already starting to fill with complaints of people who can't play multiplayer games because they're behind CGNAT , and this will only get worse. This applies to both support on gaming consoles (No IPv6 support in the SWITCH? Nintendo, are you for real?) and for game service hosting.

Friday, August 11, 2017

how to fix gnome online account timeout

Recently I have a power failure and it caused network stability. I have no idea how it affected gnome online service that the previous configured service keep saying "Timeout was reached". This is very annoying and frustrating because the email configured in mail client, that is evolution also failed to get email.



I remembered I did many things to revert the situation I encountered. This include
* reboot
* reconfigure mail account in mail client Evolution.
* delete and re-add google account in gnome online account.
* make sure the gmail settings allow non secure application access, that is , not the two steps process.

All these were unable to revert or fix the situation and it is very worrying and frustrating because it impacted my work. I google around and many suggest to kill the service and start it back. The service mentioned is goa-daemon.



But this also does not work. Finally I managed to get it work. First, make sure the hostname resolution work and it work quick. The only fastest way is to have your own DNS work locally.

Second is remove the gnome oauth for your email password. How? for gnome, it is very easy , launch seahorse and locate your email configured. In this case, it is my gmail. Then remove the entry.



Now, relaunch your email client and if you have configured email, it should prompt again for password. It works again for me and I thought this help you too.




Sunday, July 30, 2017

First try out of DateTieredCompactionStrategy

DateTieredCompactionStrategy was introduced during cassandra 2.0 and it is meant for time series data, like monitoring temperature over time, instrumenting devices metrics over time. I tested this using cassandra 3.0.11 and it works really solid.

 cqlsh:jw_schema1> desc table temperature;  
   
 CREATE TABLE jw_schema1.temperature (  
   weatherstation_id text,  
   event_time timestamp,  
   temperature text,  
   PRIMARY KEY (weatherstation_id, event_time)  
 ) WITH CLUSTERING ORDER BY (event_time ASC)  
   AND bloom_filter_fp_chance = 0.01  
   AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}  
   AND comment = ''  
   AND compaction = {'base_time_seconds': '3600', 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_sstable_age_days': '365', 'max_threshold': '32', 'min_threshold': '4', 'timestamp_resolution': 'SECONDS'}  
   AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}  
   AND crc_check_chance = 1.0  
   AND dclocal_read_repair_chance = 0.1  
   AND default_time_to_live = 0  
   AND gc_grace_seconds = 864000  
   AND max_index_interval = 2048  
   AND memtable_flush_period_in_ms = 0  
   AND min_index_interval = 128  
   AND read_repair_chance = 0.0  
   AND speculative_retry = '99PERCENTILE';  

above is the table definition. Then I added some sample data

 cqlsh:jw_schema1> insert into temperature (weatherstation_id, event_time, temperature) values ('1', '2017-03-07 20:38:20', '38');  
 cqlsh:jw_schema1> select * from temperature;  
   
  weatherstation_id | event_time        | temperature  
 -------------------+--------------------------+-------------  
          1 | 2017-03-06 16:00:00+0000 |     37  
          1 | 2017-03-07 12:38:20+0000 |     38  
   
 (2 rows)  
 cqlsh:jw_schema1> select * from temperature;  
   
  weatherstation_id | event_time        | temperature  
 -------------------+--------------------------+-------------  
          1 | 2017-03-06 16:00:00+0000 |     37  
          1 | 2017-03-07 12:38:20+0000 |     38  
   
 (2 rows)  
 cqlsh:jw_schema1> insert into temperature (weatherstation_id, event_time, temperature) values ('1', '2017-03-07 20:39:45', '36');  
 cqlsh:jw_schema1> select * from temperature;  
   
  weatherstation_id | event_time        | temperature  
 -------------------+--------------------------+-------------  
          1 | 2017-03-06 16:00:00+0000 |     37  
          1 | 2017-03-07 12:38:20+0000 |     38  
          1 | 2017-03-07 12:39:45+0000 |     36  
   
 (3 rows)  
 cqlsh:jw_schema1> select * from temperature;  
   
  weatherstation_id | event_time        | temperature  
 -------------------+--------------------------+-------------  
          1 | 2017-03-06 16:00:00+0000 |     37  
          1 | 2017-03-07 12:38:20+0000 |     38  
          1 | 2017-03-07 12:39:45+0000 |     36  
   
 (3 rows)  
   

and went on a little further by altering the compaction parameters

   
 cqlsh:jw_schema1> ALTER TABLE jw_schema1.temperature WITH compaction = { 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'timestamp_resolution': 'MICROSECONDS', 'base_time_seconds': '10', 'max_sstable_age_days': '1' };  
 cqlsh:jw_schema1> desc table jw_schema1.temperature;  
   
 CREATE TABLE jw_schema1.temperature (  
   weatherstation_id text,  
   event_time timestamp,  
   temperature text,  
   PRIMARY KEY (weatherstation_id, event_time)  
 ) WITH CLUSTERING ORDER BY (event_time ASC)  
   AND bloom_filter_fp_chance = 0.01  
   AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}  
   AND comment = ''  
   AND compaction = {'base_time_seconds': '10', 'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_sstable_age_days': '1', 'max_threshold': '32', 'min_threshold': '4', 'timestamp_resolution': 'MICROSECONDS'}  
   AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}  
   AND crc_check_chance = 1.0  
   AND dclocal_read_repair_chance = 0.1  
   AND default_time_to_live = 0  
   AND gc_grace_seconds = 864000  
   AND max_index_interval = 2048  
   AND memtable_flush_period_in_ms = 0  
   AND min_index_interval = 128  
   AND read_repair_chance = 0.0  
   AND speculative_retry = '99PERCENTILE';  
   

and I trigger nodetool flush and compact the table, work solid, no exception nor error

 user@localhost:/var/lib/cassandra/data/jw_schema1$ find temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Statistics.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/backups  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-CompressionInfo.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-TOC.txt  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Digest.crc32  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Index.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Filter.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Data.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Summary.db  
 user@localhost:/var/lib/cassandra/data/jw_schema1$   
 user@localhost:/var/lib/cassandra/data/jw_schema1$   
 user@localhost:/var/lib/cassandra/data/jw_schema1$ find temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Statistics.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/backups  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-CompressionInfo.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-TOC.txt  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Digest.crc32  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Index.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Filter.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Data.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Summary.db  
 user@localhost:/var/lib/cassandra/data/jw_schema1$ nodetool -h localhost flush jw_schema1 temperature  
 user@localhost:/var/lib/cassandra/data/jw_schema1$ find temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-CompressionInfo.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-Summary.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-Data.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-Filter.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Statistics.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/backups  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-Statistics.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-Index.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-CompressionInfo.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-Digest.crc32  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-TOC.txt  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Digest.crc32  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Index.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Filter.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Data.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-10-big-TOC.txt  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-9-big-Summary.db  
 user@localhost:/var/lib/cassandra/data/jw_schema1$ nodetool -h localhost compact jw_schema1 temperature  
 user@localhost:/var/lib/cassandra/data/jw_schema1$ find temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-Data.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-Statistics.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-Digest.crc32  
 temperature-0049c010ff6211e6b4aa1d269322be24/backups  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-TOC.txt  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-Summary.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-Filter.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-Index.db  
 temperature-0049c010ff6211e6b4aa1d269322be24/mc-11-big-CompressionInfo.db  
   

then I inserted a ttl value to the table , flush and compact, again, no exception nor error.

 cqlsh:jw_schema1> insert into temperature (weatherstation_id, event_time, temperature) values ('1', '2017-03-07 20:54:59', '37') using ttl 5;  
 cqlsh:jw_schema1> select * from temperature;  
   
  weatherstation_id | event_time        | temperature  
 -------------------+--------------------------+-------------  
          1 | 2017-03-06 16:00:00+0000 |     37  
          1 | 2017-03-07 12:38:20+0000 |     38  
          1 | 2017-03-07 12:39:45+0000 |     36  
          1 | 2017-03-07 12:52:59+0000 |     37  
          1 | 2017-03-07 12:54:59+0000 |     37  
   
 (5 rows)  
 cqlsh:jw_schema1>   
 cqlsh:jw_schema1> select * from temperature;  
   
  weatherstation_id | event_time        | temperature  
 -------------------+--------------------------+-------------  
          1 | 2017-03-06 16:00:00+0000 |     37  
          1 | 2017-03-07 12:38:20+0000 |     38  
          1 | 2017-03-07 12:39:45+0000 |     36  
          1 | 2017-03-07 12:52:59+0000 |     37  
   
 (4 rows)  
 cqlsh:jw_schema1> select * from temperature;  
   
  weatherstation_id | event_time        | temperature  
 -------------------+--------------------------+-------------  
          1 | 2017-03-06 16:00:00+0000 |     37  
          1 | 2017-03-07 12:38:20+0000 |     38  
          1 | 2017-03-07 12:39:45+0000 |     36  
          1 | 2017-03-07 12:52:59+0000 |     37  
   

that's it , if you plan to use this, better don't as cassandra 3.8 has deprecated this in favor of TimeWindowCompactionStrategy.




Saturday, July 29, 2017

Reading into Apache Cassandra AntiEntropyService

Today we will take a look at apache cassandra 1.2.19 AntiEntropyService class. First, let's get the source code of this class from github.

The class javadoc written this class, very well documented

 AntiEntropyService encapsulates "validating" (hashing) individual column families,
 exchanging MerkleTrees with remote nodes via a TreeRequest/Response conversation,
 and then triggering repairs for disagreeing ranges.
 Every Tree conversation has an 'initiator', where valid trees are sent after generation
 and where the local and remote tree will rendezvous in rendezvous(cf, endpoint, tree).
 Once the trees rendezvous, a Differencer is executed and the service can trigger repairs
 for disagreeing ranges.
 Tree comparison and repair triggering occur in the single threaded Stage.ANTIENTROPY.
 The steps taken to enact a repair are as follows:
 1. A major compaction is triggered via nodeprobe:
   Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
     receives a TreeRequest, it will perform a readonly compaction to immediately validate
     the column family.
 2. The compaction process validates the column family by:
   Calling Validator.prepare(), which samples the column family to determine key distribution,
   Calling Validator.add() in order for every row in the column family,
   Calling Validator.complete() to indicate that all rows have been added.
     Calling complete() indicates that a valid MerkleTree has been created for the column family.
     The valid tree is returned to the requesting node via a TreeResponse.
 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
    rendezvous with / compare to:
   If the tree is local, it is cached, and compared to any trees that were received from neighbors.
   If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
     the remote tree is stored until a local tree can be generated.
   A Differencer object is enqueued for each comparison.
 4. Differencers are executed in Stage.ANTIENTROPY, to compare the two trees, and perform repair via the streaming api.
That definitely a lot of operations involve in AntiEntropyService. Let's first identify all the classes

  • Validator
  • ValidatorSerializer
  • TreeRequestVerbHandler
  • TreeResponseVerbHandler
  • CFPair
  • TreeRequest
  • TreeRequestSerializer
  • RepairSession
  • RepairJob
  • Differencer
  • TreeResponse
  • RepairFuture
  • RequestCoordinator
  • Order
  • SequentialOrder
  • ParallelOrder

There are 16 classes in total and we can see that the classes is what the javadoc described above.

 AntiEntropyService is a singleton service with four status, started, session_success, session_failed and finished. An important method submitRepairSession  
   /**  
    * Requests repairs for the given table and column families, and blocks until all repairs have been completed.  
    *  
    * @return Future for asynchronous call or null if there is no need to repair  
    */  
   public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean isSequential, boolean isLocal, String... cfnames)  
   {  
     RepairSession session = new RepairSession(range, tablename, isSequential, isLocal, cfnames);  
     if (session.endpoints.isEmpty())  
       return null;  
     RepairFuture futureTask = session.getFuture();  
     executor.execute(futureTask);  
     return futureTask;  
   }  

where a new repair session is created and run by the executor. Another static method getNeighbors() where it gets neighbors that share the range.

   /**  
    * Return all of the neighbors with whom we share the provided range.  
    *  
    * @param table table to repair  
    * @param toRepair token to repair  
    * @param isLocal need to use only nodes from local datacenter  
    *  
    * @return neighbors with whom we share the provided range  
    */  
   static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair, boolean isLocal)  
   {  
     StorageService ss = StorageService.instance;  
     Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);  
     Range<Token> rangeSuperSet = null;  
     for (Range<Token> range : ss.getLocalRanges(table))  
     {  
       if (range.contains(toRepair))  
       {  
         rangeSuperSet = range;  
         break;  
       }  
       else if (range.intersects(toRepair))  
       {  
         throw new IllegalArgumentException("Requested range intersects a local range but is not fully contained in one; this would lead to imprecise repair");  
       }  
     }  
     if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))  
       return Collections.emptySet();  
     Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));  
     neighbors.remove(FBUtilities.getBroadcastAddress());  
     if (isLocal)  
     {  
       TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();  
       Set<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));  
       return Sets.intersection(neighbors, localEndpoints);  
     }  
     return neighbors;  
   }  

Next, a static Validator class implement Runnable interface has the following javadoc description

 A Strategy to handle building and validating a merkle tree for a column family.
 Lifecycle:
 1. prepare() - Initialize tree with samples.
 2. add() - 0 or more times, to add hashes to the tree.
 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
And the important three methods are

   public void prepare(ColumnFamilyStore cfs)  
     {  
       if (!tree.partitioner().preservesOrder())  
       {  
         // You can't beat an even tree distribution for md5  
         tree.init();  
       }  
       else  
       {  
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();  
         for (DecoratedKey sample : cfs.keySamples(request.range))  
         {  
           assert request.range.contains(sample.token): "Token " + sample.token + " is not within range " + request.range;  
           keys.add(sample);  
         }  
         if (keys.isEmpty())  
         {  
           // use an even tree distribution  
           tree.init();  
         }  
         else  
         {  
           int numkeys = keys.size();  
           Random random = new Random();  
           // sample the column family using random keys from the index  
           while (true)  
           {  
             DecoratedKey dk = keys.get(random.nextInt(numkeys));  
             if (!tree.split(dk.token))  
               break;  
           }  
         }  
       }  
       logger.debug("Prepared AEService tree of size " + tree.size() + " for " + request);  
       ranges = tree.invalids();  
     }  
     /**  
      * Called (in order) for every row present in the CF.  
      * Hashes the row, and adds it to the tree being built.  
      *  
      * There are four possible cases:  
      * 1. Token is greater than range.right (we haven't generated a range for it yet),  
      * 2. Token is less than/equal to range.left (the range was valid),  
      * 3. Token is contained in the range (the range is in progress),  
      * 4. No more invalid ranges exist.  
      *  
      * TODO: Because we only validate completely empty trees at the moment, we  
      * do not bother dealing with case 2 and case 4 should result in an error.  
      *  
      * Additionally, there is a special case for the minimum token, because  
      * although it sorts first, it is contained in the last possible range.  
      *  
      * @param row The row.  
      */  
     public void add(AbstractCompactedRow row)  
     {  
       assert request.range.contains(row.key.token) : row.key.token + " is not contained in " + request.range;  
       assert lastKey == null || lastKey.compareTo(row.key) < 0  
           : "row " + row.key + " received out of order wrt " + lastKey;  
       lastKey = row.key;  
       if (range == null)  
         range = ranges.next();  
       // generate new ranges as long as case 1 is true  
       while (!range.contains(row.key.token))  
       {  
         // add the empty hash, and move to the next range  
         range.addHash(EMPTY_ROW);  
         range = ranges.next();  
       }  
       // case 3 must be true: mix in the hashed row  
       range.addHash(rowHash(row));  
     }  
     private MerkleTree.RowHash rowHash(AbstractCompactedRow row)  
     {  
       validated++;  
       // MerkleTree uses XOR internally, so we want lots of output bits here  
       MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");  
       row.update(digest);  
       return new MerkleTree.RowHash(row.key.token, digest.digest());  
     }  
     /**  
      * Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.  
      */  
     public void complete()  
     {  
       completeTree();  
       StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);  
       logger.debug("Validated " + validated + " rows into AEService tree for " + request);  
     }  

Then we read that there is a inner static class ValidatorSerializer with two important methods, serialize and deserialize. Mainly serialize (or deserialize) tree request and merkle tree. The next two classes, TreeRequestVerbHandler and TreeResponseVerbHandler which is pretty trivial, both handling request and response from remote nodes.

Then another simple calss CFPair . Then another important class TreeRequest with method createMessage(). Same like ValidatorSerializer, TreeRequestSerializer also has two method serialize and deserialize.

The next class RepairSession which is pretty important, the javadoc written

Triggers repairs with all neighbors for the given table, cfs and range.
Typical lifecycle is: start() then join(). Executed in client threads.
Important method runMayThrow() describe how repair job is perform

     // we don't care about the return value but care about it throwing exception  
     public void runMayThrow() throws Exception  
     {  
       logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getName(), repairedNodes(), range, tablename, Arrays.toString(cfnames)));  
       if (endpoints.isEmpty())  
       {  
         differencingDone.signalAll();  
         logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getName(), range));  
         return;  
       }  
       // Checking all nodes are live  
       for (InetAddress endpoint : endpoints)  
       {  
         if (!FailureDetector.instance.isAlive(endpoint))  
         {  
           String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);  
           differencingDone.signalAll();  
           logger.error(String.format("[repair #%s] ", getName()) + message);  
           throw new IOException(message);  
         }  
         if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_11 && isSequential)  
         {  
           logger.info(String.format("[repair #%s] Cannot repair using snapshots as node %s is pre-1.1", getName(), endpoint));  
           return;  
         }  
       }  
       AntiEntropyService.instance.sessions.put(getName(), this);  
       Gossiper.instance.register(this);  
       FailureDetector.instance.registerFailureDetectionEventListener(this);  
       try  
       {  
         // Create and queue a RepairJob for each column family  
         for (String cfname : cfnames)  
         {  
           RepairJob job = new RepairJob(cfname);  
           jobs.offer(job);  
           activeJobs.put(cfname, job);  
         }  
         jobs.peek().sendTreeRequests();  
         // block whatever thread started this session until all requests have been returned:  
         // if this thread dies, the session will still complete in the background  
         completed.await();  
         if (exception == null)  
         {  
           logger.info(String.format("[repair #%s] session completed successfully", getName()));  
         }  
         else  
         {  
           logger.error(String.format("[repair #%s] session completed with the following error", getName()), exception);  
           throw exception;  
         }  
       }  
       catch (InterruptedException e)  
       {  
         throw new RuntimeException("Interrupted while waiting for repair.");  
       }  
       finally  
       {  
         // mark this session as terminated  
         terminate();  
         FailureDetector.instance.unregisterFailureDetectionEventListener(this);  
         Gossiper.instance.unregister(this);  
         AntiEntropyService.instance.sessions.remove(getName());  
       }  
     }  

The next two classes nested of RepairSession are RepairJob and Differencer. Which pretty much details to calculate the difference of trees and the preform a repair. The remaining tasks are pretty trivial.