Cassandra is a distributed storage system that focusses on providing a highly scalable & available service for storing very large amount of data. As part of this paper we look into how it was developed at Facebook to handle inbox search problem. Cassandra was built with a goal to handle high write traffic without a penalty to performance of read traffic.
What is inbox search problem?
Inbox search is a feature that allows users to search their inbox. As a core requirement, user should be able to list down all the messages that consist of the search query that they have entered during the search. User also has an option to list down messages from the inbox which consists of their interaction with another user.
At Facebook scale, this means handling very high write throughput of incoming messages and allowing retrieval of data from the persisted messages. As Facebook has a global user-base this meant that the data needs to be replicated across data centers in order to have a low latency for search request. To tackle these challenges Facebook ended up building a storage system from ground-up that solves all the above problems at scale.
The need for new storage system
Existing storage systems guarantee strong consistency but this guarantee comes at the cost of scalability & availability. In case of a network partition, these kind of storage systems are not useful as they have a core requirement to maintain consistency.
On the other hand systems such as Amazon’s Dynamo allow read/write operations to go through even in case of network partitions. Although dynamo’s data model requires read operation to be performed for write operation as it relies on vector clocks for managing conflicts. This can become a bottleneck at Facebook scale which comes with very high write throughput. Cassandra is built by coupling dynamo’s storage & replication techniques with Google’s BigTable data model & compaction techniques.
Data model & API
A table in Cassandra is a distributed multi dimensional map indexed by a key. Value is a structured object. Cassandra provides two kinds of column variations to the users:
- Simple column family
- Super column family (Column family within a column family)
For example a key will have have multiple columns each of which can be a simple column in form of key-value pair or nested columns also known as super column. This symbolizes a typical nested map structure where each value can either be a normal value or a map on its own.
The data model also provides functionality to sort column based on timestamp or name. Cassandra exposes its functionality to clients with below 3 API functions:
System Architecture
Building Cassandra comes with typical challenges which are part of building any storage system. In addition to these challenges there are some core distributed system concepts that make Cassandra standout. For handling write request, the request is routed to one of the nodes which in turn sends it to the replica nodes. After getting a consensus from replica nodes, write is processed. For reads, request is either routed to closest replica or to multiple replicas to get the quorum based on the consistency requirements specified by the client. Let’s dive into the concepts one by one and see what separates Cassandra from other storage systems.
Partitioning
In order to scale for a write-heavy traffic, Cassandra comes with an ability to dynamically partition data across multiple nodes. In order to perform the partitioning, Cassandra leverages Consistent Hashing. The basic implementation of consistent hashing algorithm comes with a set of challenges:
- As nodes are assigned randomly on the ring, it can lead to scenarios of uneven load distribution.
- Also the algorithm doesn’t considers the performance of nodes which can lead to hot-spots in term of request traffic.
Dynamo addressed this problem by assigning a node at multiple positions in the ring also known as virtual nodes. Cassandra pick an approach where nodes with light load moves on the ring to share the burden of node with heavy load.
Replication
Replication is necessary in case of Cassandra as it aims to run the storage system on commodity hardware where replication is the norm. Each data entry is replicated to N
(Replication factor) nodes in the system. Each key is assigned to a coordinator node which takes the responsibility of replicating it to N - 1
nodes. There are various options exposed to the client on which the replication methodology is decided such as “Rack aware”, “Rack unaware” etc.
This process of replicating and keeping track of replication load on the nodes is maintained by a leader node which is appointed by Zookeeper. Any node that joins the cluster contacts the leader to know their replication range i.e. what range of keys they are going to replicate. This information is stored both in the node as well as Zookeeper to accommodate for node failure. This set of N
nodes that are responsible for keys in a range are known as preference list for the range. This terminology comes from Dynamo.
Every node int the cluster is aware of every other node in the cluster so they are aware of the range these nodes are responsible for. The preference list is configured such that the nodes are spread across multiple data centers to cater for data center failure. So data is accessible even if one complete data center goes down.
Membership
Node membership in Cassandra is based upon an anti-entropy based Gossip mechanism called Scutte-butt. In Cassandra, membership protocol is not only used for membership but also to transmit system related state across nodes.
Cassandra doesn’t uses a typical format of membership protocol that provides a boolean value for checking if a node is available or not. It uses a mechanism to maintain suspicion level of nodes. This is based upon Accrual Failure Detector in a modified form which computes a value of Φ. Φ in this case symbolizes the probability of error while suspecting a node for failure. As part of this, each node in the cluster maintains a sliding window of arrival times of gossip messages from other nodes. This data is used to calculate Φ. This approach has turned out to be accurate and more CPU efficient.
Bootstrapping
A new node joining the Cassandra cluster is assigned a random token that determines its position in consistent hashing ring. The token is persisted on disk as well as to the Zookeeper for fault tolerance. This information is also gossiped across other nodes so that all nodes are aware of other nodes in the cluster. Cassandra initiates some of these positions as seeds of cluster that map to the token. A new node picks up any of these seeds from the configuration service like Zookeeper.
At large scale data processing systems such as the ones at Facebook, nodes can be marked as unavailable due to temporary outages due to network lag, garbage collection events etc. With such large scale data, rebalancing of nodes with temporary outages will lead to crippling the storage system. Such scenarios can lead to automatic node assignment to replace the temporarily failed node. In order to avoid such scenarios all messages contain cluster name of the Cassandra instance. Doing this allows to stop the nodes that were brought up due to false negative node failure events.
Scaling
In order to tackle scenarios where a node in ring is overloaded, new nodes are tasked with sharing the load of heavy nodes. This sharing process can be initiated by any other node or through a CLI or web interface. The node with heavy load will transfer data to new node with kernel-kernel copy technique. This process can be sped up by parallelizing the process of data copy and efforts were made towards this at the time this paper was written.
Persistence
A write operation in Cassandra leads to writing in commit log followed by a write to in-memory data structure. When in-memory file system reaches its limit, it is dumped to an in-memory disk. The write to disk is sequential in nature and an index per row is generated for efficient lookup. These indices are also persisted on disk along with files. A compaction process is run periodically to merge the files on disk to save space. This process is similar to compaction process in Bigtable.
A read operation first queries the in-memory data structure. If it is unable to find the key in memory then it starts reading from files on disk in decreasing order in which they were created. To avoid looking up for keys which are not present in storage system a bloom filter(For more details on bloom filter read 1, 2 & 3) is used to prevent unnecessary lookups.
Implementation Details
Core modules of Cassandra :
- Partitioning module
- Cluster membership module
- Failure detection module
- Storage engine module
All the above modules are implemented from scratch in Java programming language. Cluster membership and failure detection are built on top of network layer with non-blocking I/O. Communication related to request routing & replication relies on TCP and all other communications use UDP. Any read/write request goes through the following flow:
- Identify the node(s) for the key in the request
- Route the request to respective nodes
- If there is no response in a pre-configured then fail the request and return error response
- Pick the response with latest timestamp and return the response to client
- Update the replicas with stale response
As Cassandra interacts with disk to dump files which are sequential in order, it does not need a lock while reading them. The reason behind a lockless system is that the files dumped on disks are immutable hence no locking is required to access data from them. This avoids lot of complexity that comes with typical B-tree based storage implementations.
So how does Cassandra solved the inbox search problem?
A per-user index is maintained for messages that are either sent or received by a user. For search by term, user_id
becomes the key and words that are part of the message form a super column. Each message containing the word gets associated with super column as a column using message reference. So if a message is hey, let's catch up today
then the word hey
becomes a super column which will contain a column consisting of reference of actual message. In order to fulfill the search request, we need to list down messages by traveling one level down from key. First we access the key user_id
and then the search string super-column.
For search by recipient_id
, the user_id
becomes the key and recipient_id
becomes super-column with message references.
In order to speed up the search, Cassandra provides a smart-caching mechanism. So when a user clicks on the search bar, a request is sent to cluster to start buffering cache with user’s index. So now when user actually sends the search request, the superset of search result is already in the memory and can be queried much faster compared to querying from the actual Cassandra node.
This paper is a great example of how innovation can be done from learning the existing systems and coupling these learnings together. It is not necessary to reinvent the wheel for all the components of system you are building but rather you can choose to pick up the pieces that suits your use case from various other available solutions and build a solution that fulfills your use case by merging all these learnings together as a single functioning unit.
References
- Original Paper
- Tech talk: Cassandra at Instagram
- Code repository: Github mirror