Events Feed with Cassandra

image

For one of our events feed, we wanted to have a high throughput system which keeps collecting various types of event details for a user and present it in one concise form. This section would act as a great summary from where users can take requisite actions.

So the nature of the requirement was to have a system that has high read throughput, more data inserts than updates and deletes, varying records information, should be sorted based on time, 'search' not a criteria, with statuses - read & unread. So this being not a 'search' requirement, indexes like solr/ES were not the candidates but a columnar database like couchDB/cassandra/dyamoDB. We weren't going for AWS so it got eliminated. As we started understanding Cassandra, we were hooked on to it and gave it a try. This fit our requirement fully.

As practitioners, we have attempted to provide all necessary to start using cassandra in this blog.

What is Cassandra?
Apache Cassandra is a Highly Available, Scalable columnar database which delivers high throughput interms of reads and writes. It is Linearly Scalable with no single point of failure and supports multi Data Center data replication.

Cassandra works with the properties of CAP theorem, ie., Consistency, Availability and Partition tolerance, so you can’t expect certain characteristics of RDBM’s(eg., MySQL) ACID properties. especially Isolation and durablity. This means if you are looking Cassandra as a database to handle critical transactions like payment and etc., then it is not the choice.

Architecture:

Node & Cluster

A running instance of Cassandra is called a node and two or more nodes form a clusters (Cassandra Ring) . There is no Leader / Master in a cluster. Every node acts as a co-ordinator node and responds to client’s read and write requests.

And the 2nd most important consideration is the primary key for your table. Based on the primary key hash, the records that we insert are kept in that particular node. Suppose a write request reaches a node, based on the primary key a token(hash) is generated. If the generated token hash is not part of the current Node’s range then the record would get inserted into the relevant node.

image


Illustration:
The image represents an eight node cluster, a keyspace (db) was created with replication factor 3. Node 8 got an insert request from client, so it acts as a coordinator node, and identifies 3 as the node which should keep this record. Hence cassandra passes the write request to 3 and since the replication factor is 3 it keeps the same copy of data in two other nodes 4 & 5.

Gossip Protocol:
Gossip is a protocol through which nodes exchanges their state information. At a time one node is in communication with three other nodes in the cluster.

Seed Node:
When we start a cassandra cluster one of the nodes is considered as ‘seed node’. It does the normal work like any other and also has few extra functions to it.
a. This is the node which will be intimated when a new node gets added to the cluster,
b. When another node is recovering from failure seed node informs about the its data lag.

Snitches:
Snitch determines which data centers and racks the nodes belong to. Snitches inform cassandra about the network topology so that requests are routed efficiently and allows cassandra to distribute replicas by grouping machines into data centers and racks.

Cassandra support different types of snitches, Dynamic snitching, SimpleSnitch, RackInferringSnitch, PropertyFileSnitch, GossipingPropertyFileSnitch, Ec2Snitch, Ec2MultiRegionSnitch, GoogleCloudSnitch, CloudstackSnitch.

If you are delpoying in on DC then SimpleSnitch is sufficient, Ec2Snitch is ideal when you host your nodes in Amazon EC2.

Partition:

Partition determines how data should be distributed across the nodes. A partitioner is a function for deriving a token representing a row from its partition key, typically by hashing. Each row of data is then distributed across the cluster by the value of the token

Cassandra supports the following partitioners

Murmur3Partitioner: Uniformly distributes data across the cluster based on MurmurHash hash values. This is the default, can be changed by editing cassandra.yaml file.

RandomPartitioner: Uniformly distributes data across the cluster based on MD5 hash values.

ByteOrderedPartitioner: Keeps an ordered distribution of data lexically by key bytes

Writes in Cassandra:

image
Write operation consist of 3 parts,

Commit Log: Every write request will first written to the commit log file, this file will be used when a node recovers from failure.

MemTable: The requests will insert the record in MemTable (RAM) . Writing to Memtable is considered as a successful insert.

SSTable: If a MemTable memory threshold reaches its max limit then the data in MemTable will be flushed to SSTable (Disk). At this point the corresponding commit log records are also removed.

Based on the replication factor a copy of the same data will be replicated to one or more nodes.

Reads in Cassandra:

image


Once a read request received by a coordinator node, it gets the given key’s token with the help of “Bloom Filter”, and then the request will be passed on to respective node which hold the token range.

The Node which holds the data will check and gets the data from Memtable & SSTable and merges the result data and sends it back to the coordinator node which will then responds to the client.

Partition key cache, Partition Summary, Partition Index are helpful in retrieving the data quicky from the mediums.

Tunable consistency:

Cassandra allows us to set consistency levels in both reads and writes. If a write request happens and if we set the consistency level as 3 then 3 nodes has to acknowledge for a successful write. We have to use it carefully as it is resource intensive and a time consuming process.

KeySpace:

A keyspace is a collection name which holds one or more tables / column family, this can be related with MySQL database name.

CREATE KEYSPACE events_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}; 

class: can have 2 values "SimpleStrategy" & "NetworkTopologyStrategy";

SimpleStrategy: Use this If you have all your nodes in one Data Center

NetworkTopologyStrategy: Use this if you have your nodes across many Data Centers.

replication_factor: If we use SimpleStrategy then we have to specify how many replication we need to have. If we use NetworkTopologyStrategy then we need to specify the replication factor along with the DC information, like which DC how many replication etc.

CREATE KEYSPACE "employee" WITH REPLICATION = {'class' : 'NetworkTopologyStrategy','boston' : 3 , 'seattle' : 2 ,'tokyo' : 2}; 
//boston,seattle/tokyo are DC names which has to be defined in the cassandra configuration file cassandra-rackdc.properties.

Table:

Cassandra’s throughput is based on the way you define a table, you have to think backwards ie., you have to think how you are going to update records and how you are going to filter out results.

CREATE TABLE events_ks.eventlist (eventid UUID , eventname text, eventdate timestamp, eventcategory text, eventduration int,
PRIMARY KEY (eventid,eventdate))
WITH CLUSTERING ORDER BY (eventdate DESC)


This table was created to store events, and the events will be displayed in the order of latest eventdate first

You may be wondering why there are 2 fields are given as Primary key?

The first field is know as partition key and the second one as clustering key.

Based on the partition key the node in which the data has to be stored will be identified and clustering key will be used to store the data in a particular order, in our case the data will be stored in descending order.

Both Partition key and Clustering Key can have composite keys ie., we can give more than one field.

PRIMARY KEY ((eventid,eventcategory),eventdate,eventduration)

Note that we have to define the order for each and every clustering key, otherwise it will insist you to specify.

Your update queries should have the partition key, clustering key as part of the where clause.

Can't I query without specifying primary key in where clause?

Yes you can, by adding "Allow Filtering" but the query will be executed across all nodes to get the result. This is not advisable, especially when you are dealing with large volume of data. Allow Filtering is must even if you use a non primary key with a normal field.

Select eventname, eventdate from events_ks.eventlist where eventcategory ="Tech Meet" ALLOW FILTERING;

Field Index:

Cassandra supports field level indexing like MySQL. The index will be created per node level. This gives little boost in query performance, but still you need to use "Allow Filtering".

CREATE INDEX idx1 ON events_ks.eventlist (eventcategory); 


The above query will create an index on the field eventcategory.

Materialized View:

Materialized view is one of the cool feature that cassandra offers. In our eventlist example we have defined the schema in such a way that, whenever you query we will get the result in the descending order of event-date.

If we want to get the results in the alphabetical order of event-category we can’t, because we have not designed the table in that way.

This is where Materialized views comes in handy. All you have to do is just create a materialized view based on the existing table eventlist by specifying the field on which we want the results to be sorted.

CREATE MATERIALIZED VIEW events_ks.eventlist_mvecat
AS SELECT * FROM events_ks.eventlist WHERE eventname IS NOT NULL AND eventdate IS NOT NULL AND eventcategory IS NOT NULL
PRIMARY KEY (eventid, eventcategory, eventdate)
WITH CLUSTERING ORDER BY (eventcategory ASC, eventdate DESC);


The moment you execute this command, the data from the base table will automatically loaded to this materialized view in the order we want. New additions or updates happening in the base table will be automatically gets synced to this materialized view also.

Does it sounds like data duplication? Yes it is. In cassandra world data duplication is good.

The one constraint here is, you can specify only one new field in the clustering key section which is not defined in the base table’s partition and clustering key specification.

Tombstones in Cassandra:

This is one of the key thing which people tend to miss while looking at cassandra. When we try to delete a record, cassandra actually not deletes it, instead it creates a tombstone which marks that record as deleted. While querying, these records will also come in the result set but filtered before records are sent to the client.

If the tombstone count goes beyond 1000 for a a query, cassandra starts throwing warnings and it stops executing the query, if the count goes beyond 100,000. These numbers are configurable, parameters tombstone_warn_threshold , tombstone_failure_threshold in cassandra.yaml.

When these tombstone records will be removed ?

It will be done when the compaction command gets executed. Though cassandra does automatic compaction it is good to have a tab on the tombstone warnings in cassandra logs and act accordingly. Cassandra does support many compaction strategy and the default one is “Size-Tiered”

Compaction is a critical task, so it requires heavy compute power. Hence if you want to try a manual compaction do it on the less peak hours.

Configuration or Fine Tuning: The out of the box configuration holds good for most of the use cases. Tombstone settings, Heap settings are the few things has to be looked at based on your usage.

Monitoring Tools:

Apache Cassandra don’t have a GUI interface to see the metrics. But it comes with a tool called “nodetool” which is very useful for monitoring cassandra clusters.

Few useful commands are

nodetool  tablehistograms --  <table>
nodetool tablestats -H keyspacename.tablename
nodetool  netstats -H
nodetool proxyhistograms
nodetool  status ( -r | --resolve-ip ) -- 
nodetool -h  tpstats
You can use graphical tools like Grafana etc., to generate beautiful insights from the data collected from nodetool.

Resources:

http://cassandra.apache.org/

https://docs.datastax.com/en/cassandra/3.0/index.html

https://www.ibm.com/developerworks/library/ba-set-up-apache-cassandra-architecture/index.html