Tarantool in one ABS

How we implemented distributed cache on Tarantool in one ABS.

How we implemented distributed cache on Tarantool in one ABS.

The development of any fairly serious software, whether it is a matrix calculator or an AI of an unmanned car, is always some kind of subject area, certain technologies, algorithms and data structures, code architecture, the development process and many other smart terms from the IT world.

——————————————————-

This article presents one of the solutions in the world of high performance and distributed systems. Under the cut, you will find a description of just a small number of tasks and problems that we have encountered, as well as some interesting algorithms, approaches to building the system architecture, methods for optimizing queries, and a little about the process of developing and testing a solution based on Tarantool — an in-memory computing platform with a flexible data schema for effectively creating high-load applications.

Statement of the problem:

In one bank, there was a task to speed up the execution of requests to the Automated Banking System (ABS).

The task was set as follows. There is a database of ABS, which is often accessed by consumer applications, load it and get long delays, the average value of which reaches 600 ms. You need to unload the ABS database. To do this, we decided to transfer some of the requests to the cache, which can produce delays of up to 200 ms on the 90th percentile (that is, at least 90 % of cache requests must be executed faster than 200 ms) with a load of 1500 requests per second. Requests can be arbitrary. A single query can contain multiple object uploads and multiple data JOINS from different tables. The number of objects that are supposed to be stored in the cache is measured in millions, so one cache instance is not enough, distributed storage is required.

The choice fell on the in-memory database Tarantool, or rather on a distributed solution based on it-Tarantool Data Grid (TDG, article on Habr), which has already shown its effectiveness in the development of distributed applications. A boxed solution with numerous implemented mechanisms that facilitate development and allow you to meet the requirements of information security, makes TDG a very convenient tool for implementing the task

The following model of cache organization was adopted: at the first launch, the cache is filled with data from the ABS database replica, after which it continuously monitors and applies all changes in the database. This organization was chosen because it is convenient when the cache is not working with the main ABS database, but with its replica, which avoids additional load on the main database. The disadvantage of this scheme is that consumer applications need to track the relevance of data in the cache and the time lag of the replica from the main database, so that if necessary, they can access the main database directly to ensure that they receive up-to-date data.

The main difficulty that arises in this task is the implementation of data synchronization, which includes:

initial import of data from an ABS DB replica to TDG;
tracking changes in the ABS DB replica and keeping the data in TDG up to date.

In addition, it is necessary to ensure high cache performance, that is, to ensure that the specified SLAs are met.

Solution architecture

Synchronizer

To share the load on external requests to TDG and on unloading data from the ABS database, we decided to develop a separate data synchronizer component. It implements all the logic for uploading data from the ABS database, tracking changes in them and importing them into TDG. To provide fast data import to TDG, as well as scale and fault tolerance, the synchronizer was developed as a distributed application that implements the producer-consumer pattern. The producer generates object IDs and puts them in a queue (which each producer has its own), consumers take the IDs from the queue, make a selection of objects and send them to TDG.

TDG

TDG, in turn, is also a distributed solution. The system can be flexibly configured by assembling it as a constructor of components, each of which can be assigned one of the predefined roles. For our purposes, it was enough to create components of two roles: connector and storage. Connector provides an interface for working with the outside world. The project uses graphql for both requests from consumer applications and from the synchronizer. Storage stores data, and sharding is used to distribute it across storage. You can use, for example, nginx to distribute the load between connectors, which, in turn, will form requests to storage to perform the necessary operations on the data.

It is worth noting that TDG allows you to flexibly implement query handlers in the Lua language, providing an API for performing the necessary actions (for example, map/reduce). This feature allowed us to further optimize queries based on the structure of the queries themselves and the data model.

Ensuring that data is up-to-date

The synchronizer must perform two tasks:

  1. initial import of data from the ABS database to TDG;
  2. synchronization of operations for creating/updating/deleting items.

The initial import is required when the cache is first started, as well as for complex data model updates. It may also be necessary, for example, if data is taken to the cache from an ABS DB replica and a hard reset of the replica occurred (that is, it was dropped for some reason, and a new one was initialized from the master): in this case, there is no guarantee that all data updates were taken into account from the moment the replica was disconnected until it was restarted.

You can use two mechanisms to track updates:

Triggers on target tables in the ABS DB replica that will trigger on any changes to cache objects.
Special tables that will contain objects that are ready to be sent to the cache, indicating the time stamp of the last update.

In the first case, triggers stack modified / deleted objects into separate tables, which are periodically checked and cleared by the synchronizer.

In the second case, a mechanism for filling tables should be implemented on the ABS side; these can be stored procedures or software that somehow (for example, by periodic runs) track changes in data, form the necessary modified/deleted objects and add them to these tables.

In both cases, the producer periodically performs queries to the tables that contain the modified data, determining the IDs of newly created/modified / deleted objects. These identifiers are then passed to consumers, who form ready-made objects and send them to TDG.

Fighting race states

When using the producer-consumer template, race conditions may occur when updating data. Imagine that an object was updated twice with a short interval between updates, and the changes were picked up by different consumers. Due to network problems, a consumer who created an earlier version of an object may send it to TDG later than another consumer, resulting in an invalid, outdated object in the cache.

To resolve this race, TDG uses object versioning based on the object update timestamp. A field with its version equal to the time of its last update is added to each object in the cache. When an object is formed by executing an SQL query in the ABS database, a timestamp is added to the result, which is taken from the database itself. That is, an object formed earlier will always have a timestamp less than an object formed later (assuming that the time of the database itself is linear).

TDG has a mechanism where an object with a later version will overwrite an object with an earlier version, but not vice versa. The same rule applies when deleting: the item is not physically removed from the TDG, but is marked as deleted, while maintaining the deletion timestamp. In this way, race states are also resolved in case of update/delete conflicts.

Object versioning allows you to solve another problem that occurs when data is forced to be re-imported. Let’s assume that for some reason a replica of the ABS database was recreated. There may have been changes in the main ABS database at the time of re-creation, which can no longer be tracked using triggers. In this case, data reimport is started, and objects with new versions overwrite objects with outdated versions. However, some objects may have been deleted while the replica was being recreated. Such objects can be removed from the cache after the reimport ends by deleting all elements whose version has not been updated, i.e. its value is less than the time stamp of the beginning of the reimport.

Fault tolerance

One of the most important characteristics of a distributed system is the ability to maintain performance in the event of a failure of any of its elements. Here you should follow the rule of building a reliable system from unreliable components: you should always assume that any node — a database instance, server, producer or consumer-can interrupt work for one reason or another (this, of course, does not mean that you can ignore such crashes).

Recall that both the synchronizer and TDG have two types of nodes. Consumers in the synchronizer and connectors in the TDG are nodes that do not store state. If a large number of such nodes have single failures, then requests will be directed to other nodes of the same type, which will not lead to significant degradation of the system, so you do not need to come up with any other fault tolerance mechanisms. To avoid a complete system failure when physical servers fall, you should have a stock of components of the same type on different servers.

In TDG, when configuring the cluster topology, you can create replicas for database instances (storage), which will allow you not to lose data in case of failures. At the same time, it is recommended to place storage and their replicas on different servers to work out cases of failure of one of them.

Let’s now consider how the fault tolerance mechanism for synchronizer producers is implemented. A duplicate slave producer is always started for each producer. An auxiliary table is created on the replica of the ABS database, which is periodically updated by the producer-master. If the table has not been updated for some time, the slave producer notices and takes the initiative, becoming the master. The ABS database tables can also store the internal state of the current master producer, such as whether the initial data load has passed or at what stage it is. When switching, the new master restores the state and continues the work of the predecessor.

High performance on large queries

Tarantool is an in-memory database that allows you to achieve great performance (a few examples: Article 1, Article 2, Article 3). The actual performance, usually calculated in terms of the number of requests processed per second, depends on many factors. For example, if we want to pull out all the contents of the database from millions of elements in a single request, and even in a structured form, then it is unlikely that we will be able to reach a million such requests per second. In such cases, in order to obtain the required performance, analytical studies are required to change the structure of queries and optimize them.

Data model optimization

One of the features of the solution is that the cache data model differs from the relational model of the ABS database. On the one hand, you can completely copy all the data of the main database to the cache without changing their structure, which will lead to unnecessary duplication and actually complete copying of queries with all JOINS of relational objects. In this case, it is unlikely to be possible to get a significant gain in the speed of the cache relative to the ABS database. On the other hand, ideally, to get the minimum delay, the cache should store objects “ready-made”, returned on requests without performing any JOINS. Such objects can be obtained by executing complex SQL queries to the ABS database and caching the results of these queries in TDG.

Suppose you need to get high performance on several known queries. In our case, these were queries with the following simplified structure:

  • first request: get basic information on some customer’s cards;
  • second request: get full information on the specified map.

Let consumer applications request the cards of some client (requests of the first type). This client has, for example, 20 cards. If all of them are stored in the cache by one object, then the response to the request can be obtained very quickly (given that the cache is an in-memory database). However, if the data for one of the maps changes, you will have to completely update the entire cache object.

Thus, it is necessary to find a balance when developing a cache data model by inserting “half-finished” objects to get the final responses to queries more quickly. How best to form such a model is a matter of analysis; many factors need to be taken into account, such as the requirements for maximum response latency, ease of implementation, data and query structure, and so on

In our case, the model of storing maps as separate objects was chosen. After denormalization, all information was placed in 3-4 tables (spaces in Tarantool terminology) from more than a dozen tables in the ABS database.

However, the denormalized model still requires joins when executing queries. Given that data in TDG is distributed using sharding, each JOIN can result in collecting pieces of an object from the entire cluster. For example, for the above-mentioned request, client maps may be stored on different nodes in the cluster, and the execution of joins of these maps may take a long time due to the large number of network requests (especially under heavy load), which will lead to an increase in the total response delays.

Note that the optimization made it possible to speed up the execution of requests to 200-300 ms for the 90th percentile at 1500 requests per second (estimated on the client’s map search queries), which is not yet sufficient to complete the task. It is worth noting that requests of the second type (getting information from the map) were performed faster and showed delays of 20-30 ms under the same conditions.

Optimization of distributed queries

We will continue to consider the given example of the request “get cards by customer”. As mentioned above, TDG has a lot of storage, and by default, the client’s cards will be scattered across the distributed system. Collecting maps from the entire cluster can take a significant amount of time, especially when the network is under heavy load. It would be logical to place all the cards of one client on one storage.

Data distribution over storage in TDG is performed by sharding on the primary index or part of it. To card one customer got on a storage, it is enough to add the primary index maps a customer ID (first, initial the index card ID field) and Sardarapat only by this ID. When executing the request, it is enough to make one “visit” to storage by the input ID and pick up all the cards from there.

This optimization of local map placement allows you to significantly speed up the execution of requests of this type. In our case, the acceleration was somewhere on the order of magnitude: from >200 ms to 20-30 ms for the 90th percentile at 1500 requests per second, if we consider only the client’s map search queries. It is worth noting that there was really a lot of data returned in each request: the size of the response to one request reached several hundred kilobytes. The performance of queries of the second type (output of complete information on the map) remained unchanged and reached the same 20-30 ms.

The described optimization is not always possible and imposes some additional restrictions. The first important point is the need to track changes in the customer ID of the card. If the owner of the map changes, we get an object with a new primary index value (let me remind you that after optimization, we have a primary index of two fields, one of which is the client ID). In this case, you must also delete the previous object, since TDG will not do this for us.

It is also worth noting that the optimization was done based on the specific features of the request. We were able to speed up one request, but there may be many different types of requests, and in general it may not be possible to speed up all of them. In addition, when you speed up some requests, others may, on the contrary, slow down. You need to find a balance between the performance requirements of different queries and the ability to optimize them. In the General case when the queries are initially unknown or very different, to carry out such optimization is not worth it.

Development

Developing a distributed system is not an easy task. In addition to the need for high performance and the availability of scaling and fault tolerance mechanisms, there is usually a large list of functional requirements, and you should also not forget about the convenience of operation.

To ensure that the functionality works correctly, we covered all emerging use cases with tests. These were mainly integration tests, of course, since such tests required the collaboration of various elements of a distributed system (ABS database, synchronizer, TDG). Here you can not do without modern test automation tools, such as Docker, Pytest, continuous integration (CI), etc.

In addition to functional requirements, tests are very helpful to check various mechanisms for the correct operation of a distributed system. For example, you need to make sure that the change of the producer-leader occurs correctly — we write a test. If a race condition can occur, we write a test. To check the resolution of the race state, you can, for example, describe a replay of the occurrence of such a state and run it in a small loop. If CI is present, this loop will be played often enough to make sure that the race does not occur.

An important point is the lack of flacky tests — which “sometimes fall” on CI. Their presence often shows that there are some problems in certain implementations of mechanisms for the correct operation of a distributed system, or, for example, there is some race condition that has not yet been identified. All such tests were carefully studied, and the reasons for their fall were corrected.

In total, there are now more than 200 integration tests on such a seemingly small project.

For ease of operation, a large number of metrics are implemented in the system. These are both low-level metrics, such as the amount of memory used, connections established, objects in the database, queue sizes, etc., and higher-level metrics, such as histograms of query execution time at different levels (queries to the ABS database, queries to TDG), the number of queries performed per second, etc. By the way, Tarantool has a Lua-module metrics, which allows you to collect metrics in a format suitable for Prometheus, followed by displaying graphs in Grafana. Practice has shown that all collected metrics can be important for analyzing incidents and investigating the properties of the system.

In addition to metrics, it can be useful to create endpoints that, for example, give out the current state of operation of some system component on an HTTP request. For example, an endpoint that gives out the current lead producer is very useful.

Another tool that allows you to investigate performance issues is query tracing. TDG has built-in support for OpenTracing, which was actively used in query optimization.

Conclusions

Tarantool Data Grid allows you to configure and configure a distributed cache out of the box and implement a rather complex logic of its operation. To get maximum performance, you may need to optimize queries based on their features at the level of the system architecture and data model.

In the article, much attention is paid to the development of a synchronizer-a distributed system that provides data import and updating in the cache. Scaling, fault tolerance, ease of operation, the occurrence of race conditions-these are a number of mechanisms considered and problems that may arise when building such systems.

Due to the complexity of the development, special attention was paid to testing both for functional requirements and for correct testing of various mechanisms of the distributed system. Adding metrics and endpoints that return the state of the system improved its maintainability, and tracing queries allowed you to find bottlenecks and improve performance.

Links

You can download Tarantool on the official website.

Read a new article about bugs in cyberpunk

Valery Radokhleb
Valery Radokhleb
Web developer, designer

Leave a Reply

Your email address will not be published. Required fields are marked *