An ETL tool ( Import Framework) is responsible for pulling data out of the source systems and placing it into a data warehouse.

In our case at Capillary, ETL was needed for data common reason:

  • When a client signs in with Capillary for loyalty and they had been capturing customer details(and even had an old loyalty program in use), client will never want to start from zero. Historical data have to be in the system before they go live.
  • API integrations provides the realtime from cloud, but what if the internet connectivity is bad. Next good option will be to capture all the data locally and sync the data into cloud once in week or so.
  • A client want to award some extra points to certain group by users or they want to tag some stores to new sub-zone created. All these can be done from a UI one-by-one, but if a bulk functionality is provided, it would save a lot of time.

We have developed our ETL tool to address all these issues. We accept csv file as the input file and will be loaded to MySQL database.

Basic Flow

Before we jump into how we improved the system, lets understand the basic flow of the system. The basic flow of any ETL tool will be almost the same and the sequence of events described as

flow

As these steps are self-explanatory from the image,I would try to draw out the significant features of the ETL tool in capillary

Transformers:

We support a lot of data formatting within the ETL tool to which makes the system very flexible. The most common transformers are :

Date : DateTime can be passed in any valid format and transformer can be set as Format=>Y/m/d (or any valid syntax).

Expression: Suppose if you have rate(column 3) and quantity(column 4) of lineitems, but the amount column was not shared in the file, you can mention amount as Expr=>{3}*{4} in the transformer

Merge : Address is best use-case for why we need merge transformer. Merge=>1,2[Separator=>, ] merges columns 1 and 2 into a single column.

Auto Correction :

ETL has been used for long time and system had to be tuned based on the past knowledge of the incoming data. For example, a transaction worth 100000INR seems to be very unlikely and could be marked as fraud. Or if amount is not set while rate, quantity and discount are mapped, then we can safely assume amount will be rate*quantity-discount. ETL does these actions by itself and will report them.

Report Generation :

When an import is done, its quite natural to have some erred records or some auto corrections applied. We generate the stats of each of them and save in DB. Also such records will be extracted out into files(one file for each of type – error, auto correction and file dump)  and generated csv files be pushed to S3.

Main DB :

Till this phase whatever it happens, no one is bothered as everything happens on temporary tables. But once you push to main DB, there is no rollback. The phase consists of mostly INSERTs/UPDATE queries on realtime DB and is ran as a single MySQL transaction.

FTP integration:

We can upload files through UI and import, but there is always restriction on file size. Using FTP integration functionality, system reads from FTP server and imports using specified configurations and mappings ( saved as template for each profile) based on the scheduler set.

Technologies Used

Scripting language : PHP 5.3

Database: MySQL 5.5.40, Memcache/Redis

Platform : Ubuntu 12.04

Web server :  Apache 2.2.22

Constraints and Concerns:

Engineering gets interesting when we have more and more constraints and ETL is no different. I would like to share  few of them which played vital role in designing tool.

Multi Tenancy:

At Capillary, all clients (tenants ) share the same datastore – be it MySQL, Memcache/Redis. Now the issue is obvious – an import happening for Tenant A should not affect Tenant B. It was addressed with help of revisiting MySQL schema:

  • All MySQL tables has InnoDB as engine to ensure no row level locking
  • All indexes are of the format (tenant_id, field_1[,field_2]) to make sure any query on Tenant A will not affect Tenant B. Also all our queries/joins will have tenant_id in it.
  • As the system grew, we have used MySQL partitioning and custom sharding policies (based on tenant_id) to insulate tenants from each other.

Concurrency Management and Locking:

We have solved the problem of multi-tenancy, but what if two imports are happening for same tenant in parallel? Both the imports may try to lock the same record and followed by lock wait timeout of MySQL queries. We introduced a locking mechanism with memcache with two checks to avoid such race conditions

  • For same tenant, for same profile, second imports can happen be running
  • Across all tenants not more than 5 processes can run at given point of time.

This second check helps to keep the appserver and db machine from getting overloaded.

Dynamic Batching :

When we are talking about files with millions of rows, processing whole file in memory is not possible. Batching was inevitable for any operation – reading from file,validation, or writing to DB. Say for example, while validating email, first 0-20000 records are validated, then 20001-40000 and so on till the end of file. Then next action will be picked.

Still the problem was not resolved. A file might be having 10 columns while another file is having 100 columns, 20K batch of both will vary in data size. So batching based on number of records was tweaked to dynamically reduce the batch size. We had used the avg_row_length from show table status  once the temp table  is created or by getting the average size of the first 10 records from csv file and prorated batch size, to get the new batch size.

For example, assume we have set our average row size as 9000 and validation batch size as 25000. But the file has average row length as 12000, so the new batch size is floor(9000*25000/12000), ie 18750 records.

Also to mention, the configuration is realtime. That is, if the server looks loaded, you can change the same batch size to a smaller number and from next batch onwards for same ETL process new batch size will be used.
Though 2GB is memory_limit, no import took more than 1GB after dynamic batching was introduced.

Co-exists with Realtime Flow:

At Capillary, after doing import, people expects to see the result immediately available in realtime flow. For the same reason, we have to get the data to realtime DB not to a warehouse DB. Reading and writing a millions of records through ETL, when we already have a heavy traffic will slow down both our realtime flow and import.

We have separated heavy traffic(day time) and low traffic(night) hours in each colo. Then restrictions was applied on the maximum file size during heavy traffic hours. So at day time, we may support 200K records but in the night it might be 10M also. We have taken the safe  assumption that, large files can wait and need not be required in realtime.

Multi-Server Environment:

A single server cannot scale to serve all the requests when number of request goes high. We also had to work on the behaviour of the ETL in multi server environment. Common requirements like session management was already handled by our framework. Still we have our own problems.

The whole ETL process is happening through different page submits. Suppose if the file upload happened in server 1 and the temp DB creation request happens in server 2, the file would not be available on server 2 locally and . We have made use of another internally developed tool, Fileservice(will write more about it in another blog), to will push uploaded file to S3 and the handle is stored in DB. If the requested file is not found in local server at nay point of time, we fetch the file from the Fileservice and proceed.

A few things we learnt the hard way:

Murphy’s law has never gone wrong for us. There had been days when we find no issue in the code, but system is not responding for no reason or failing.

PIOPs:

We have hoisted most of our service on AWS cloud for years. On a random day, our DB master was not responding and we had our longest downtime. IO was very slow and we finally decided to upgrade our DB machine – 32GB to 64GB(56GB innodb buffer pool size) and attached a volume with PIOPs. We used to hit atleast 2 lock wait timeouts in a week (import colliding with realtime udpates) before the upgrade, while not even 2 timeouts in last 10 months! Similar improvement was visible in most queries. If you are expecting load of read-writes, PIOPs make big difference.

MySQL limits :

For products(catalogs) each tenant can create any number of custom attributes and map their values for each product during import. In the temporary table (created during Temp DB phase), a column and an index are created for each attribute. For some tenants, they had 70 even attributes and that is how we learnt two limits in MySQL

  • Max row size for a table is 65K
  • Max number secondary indexed innodb supports is 64

Luckily the error message from MySQL was obvious and could identify the issue. Later we limited the number of custom attributes import to smaller number( creating too many attributes was mostly improper usage of system)

MySQL Replication  :

We have been running with statement based replication for long time and ETL used INSERT..ON DUPLICATE KEY UPDATE ( replication unsafe query) a quite frequently. We decided to try row based replication and started to test the same in our testing environment. Everything was good till we did a import with a million records which create around 10GB binlogs. The plan to change the replication format was left out though we are using mixed mode in some cases.

MySQL Vs PHP

We had to decide on choosing MySQL or PHP for validations. And this is how we placed it

  • Null checks, Enum checks, greater than, less than etc was better in MySQL. (Data was fed to PHP after reading from MySQL on ID based values)
  • Regex was taking similar time in both cases

Hacks 

Line count:

To count the records in csv files, loop with fgets till the end of file was used. As we are using Linux servers and have decided to stick to it,  “wc -l” command was experimented. When the file had a million records, the difference was a few minutes to some tens of seconds in wc.

Non Singleton class to Singleton class:

During the ETL process, the controller class was initialized more than 20 time in a single thread which internally had another 10 classes consuming memory and time. Initializing of controller class was too much overhead. Making the controller a singleton class was the best solution. But making code changes in many files was not advisable either.

We have made use of __call() and __callstatic()(PHP magic functions ) and  a psuedo-class to solve it. The file now looks like:

Duplicate check across multiple columns:

In our system, a transaction(bill) is uniquely identified using combinations of tenanat_id, user_id, txn number, txn date and store. During import an additional restriction was set same line item cannot exists in a transaction(it should come as qty+qty2 in a single row). The restriction was needed for another feature. A big file might be chunked to smaller files and imported. The smaller file 1 might have 3 line items and file 2 has 2 line items of same transactions. In that case we have to add the line items in two files to same transaction(configuration driven). But if the same file  is reimported? Line items will appear twice. Same check was enabled with in the file as well, in a given file lineitems cannot for same transaction. To identify and mark them invalid queries, we ran a query like this followed by an id based UPDATE.

Grouping on 5 columns was inherently slow and to add to that, a limit made it worse. Then we decided to try something different by creating a column which is hash key of all five columns and indexing the column.

 

This has proved to have better performance especially when the file size is huge.

 

With the in-house system, we have imported more than 500M records in last 4years and still growing.