Category Archives: General

MySQL – Processing 8.5 Million Rows In a Reasonable Amount of Time

I had to crunch through a database of approximately 8.5 million records computing a hash, validating a few fields, and then updating a column with the results of the hash. Because I needed to compute the hash, I couldn’t just use an UPDATE statement to work on all the rows – I had to read and update each of the 8.5 million rows in my script. Sounds painful already!

On my initial attempt at SELECT the record, calculate the hash, and UPDATE, I was able to do about 150 rows per second… Let’s see, that’ll take a 20 hours… We can do better than that… :)

Skip to TL;DR

Parallelization & Optimization

The first thing that I wanted to optimize is the use of all 8 logical cores (Core I7 with hyper threading). That’s why I chose to use Sidekiq. We could now have multiple workers crunching different chunks of data and hopefully saturate both IO and CPU. I used Boson to make my app a simple command line application, but I could have easily used rake tasks too.

Here is pass #1 at this task:

Command Line To Start Worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env ruby
 
require File.expand_path( '../../config/environment', __FILE__)
 
require 'boson/runner'
require 'sidekiq'
require 'workers/process_worker'
 
class ProcessRunner < Boson::Runner
  def process( num_workers = 10 )
    num_workers=num_workers.to_i
    puts 'spawning workers'
 
    Sidekiq.redis {|conn| conn.set('timer', Time.now.to_f) }
    num_workers.times do | worker_number |
      ProcessWorker.perform_async worker_number, 100
    end
  end
end
 
ProcessRunner.start

Worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
require 'sidekiq'
require 'sequel'
 
class ProcessWorker
  include Sidekiq::Worker
 
  def perform(worker_number, chunk_size)
    DB.transaction do
      dataset = DB[:work_table].select_all(:work_table).left_outer_join(:output_table, :fk_id => :id).where(:result => nil).limit(chunk_size,worker_number*chunk_size)
      output_table = DB[:output_table]
      dataset.each do |row|
        result = # Process the row data here
 
        output_table.insert(:result => result, :fk_id => row[:id])
      end
 
      if dataset.count > 0
        puts "spawn worker to do more work again."
        ProcessWorker.perform_async worker_number, chunk_size
      else
        start_time=Time.at(Sidekiq.redis {|conn| conn.get('timer') })
        end_time = Time.now
 
        puts "Worker ##{worker_number} reports job took #{(end_time - start_time)*1000} milliseconds"
      end
    end
  end
end

My strategy here was to have a secondary table (output_table) that I would dump my results to and join it to the primary table work_table. With my outer left join, I would search for rows in the work_table that had null values in the results field indicating that it had not been processed yet. I tried out using Sequel for this project because I never used it before and thought it’d be nice being able to have a ruby model way of accessing the data. It turned out sorta nice. The data was divided into different chunks among the workers. Each chunk was chunk_size large (100 in the example)… So it would look something like this:

--------------------------------------
| row 1-99    | chunk 0 for worker 0 |
| row 100-199 | chunk 1 for worker 1 |
| row 200-299 | chunk 2 for worker 2 |
| row 300-399 | chunk 3 for worker 3 |
| row 400-499 | chunk 4 for worker 4 |
| ...         | ..                   |
--------------------------------------

This algorithm and implementation did not turn out nice though! Because I was SELECT-ing data and processing it non-atomically; if, for instance, worker 1 finished rows 100-199 and asked for another set of data, it would now be working on rows 200-299 at the same time as worker 2 that was supposed to work on the data…

After processing row 100-199, the new chunk 1 was row 200-299:

--------------------------------------
| row 1-99    | chunk 0 for worker 0 |
| row 100-199 | COMPLETED            | (not returned to SELECT query)
| row 200-299 | chunk 1 for worker 1 | <- now both worker 1 and worker 2
| row 300-399 | chunk 2 for worker 2 |    is working on these rows
| row 400-499 | chunk 3 for worker 3 |
| ...         | ..                   |
--------------------------------------

Bad bad bad race conditions… Inefficient processing of data… still only getting about 300 rows processed per second… 10 hours still too long for the job… back to Google, StackOverflow, and the MySQL manual…

MySQL Bulk Data Recommendations (LOAD DATA INFILE)

The MySQL documentation has a lot of great tips for optimizing bulk inserts, but the most useful I found was this:

When loading a table from a text file, use LOAD DATA INFILE. This is usually 20 times faster than using INSERT statements. See Section 13.2.6, “LOAD DATA INFILE Syntax”.

Speed of INSERT Statements

I’ll take a 20x speed up! Let’s rewrite the code to take advantage of INFILE… I’ll be using tmp csv files.

Job Invoker:

10
11
12
13
14
15
16
17
18
  def process( num_workers = 10 )
    Sidekiq.redis {|conn| conn.set('process_workers', num_workers.to_i) }
    puts 'spawning workers'
 
    Sidekiq.redis {|conn| conn.set('timer', Time.now.to_f) }
    num_workers.times do | worker_number |
      ProcessWorker.perform_async worker_number, 1000
    end
  end

Worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
require 'sidekiq'
require 'sequel'
require 'sequel/load_data_infile'
 
class ProcessWorker
  include Sidekiq::Worker
 
  def total_num_of_workers
    Sidekiq.redis {|conn| conn.get('process_workers') }
  end
 
  def perform(worker_number, chunk_size, current_count = 0)
    file_data = File.open("/tmp/worker_#{worker_number}_output#{current_count}.txt", 'w')
 
    dataset = DB[:work_table].select_all(:work_table).left_outer_join(:output_table, :fk_id =&gt; :id).limit(chunk_size,total_num_of_workers.to_i*chunk_size*current_count+worker_number*chunk_size)
    output_table = DB[:output_table]
    dataset.each do |row|
      result = # Process the row data here
 
      file_data.puts "#{row[:id]},#{result}"
    end
 
    file_data.close
 
    output_table.load_csv_infile("/tmp/worker_#{worker_number}_output#{current_count}.txt", [ :fk_id, :result ])
 
 
    ## TODO: it'd be nice if we clean up the tmp directory when we're done.
 
    if dataset.count > 0
      puts "spawn worker to do more work again."
 
      ProcessWorker.perform_async worker_number, chunk_size, current_count+1
    else
      start_time_raw=Sidekiq.redis {|conn| conn.get('timer') }
      start_time=Time.at(start_time_raw.to_f)
      end_time = Time.now
 
      puts "Worker ##{worker_number} reports job took #{(end_time - start_time)*1000} milliseconds"
    end
  end
end

I made the following changes with the code:

  • I’m using the sequel load_data_infile gem… love it that there’s a gem for everything. :)
  • I avoided the race condition by using LIMIT and OFFSET to always advance not base the query off of data that is being processed
  • We write the processed batch to CSV file and have mysql import it.

Let’s give it a run…

Worker #1 reports validate job took 2153123.5738263847 milliseconds

Yay! We were finally able to finally process through the full dataset in a reasonable amount of time! 1/2 hr is not too bad for 8.5 million records, right? Actually, we can probably optimize this a bit more…

Scanning By Primary ID instead of LIMIT/OFFSET

I noticed that the workers that was processing data at the beginning of the table was returning back fast – 1-5 seconds and as we reached the end of the table, around the 4 million mark, it got extremely slow and the CPU would start working really hard.

I did a little digging and I found the issue is that LIMIT/OFFSET has to query the ENTIRE dataset up to the point where you need and then it discards all the data at the beginning and returns you the amount of data that you want. In other words, mysql was going through 4,001,000 records to give me record 4,000,000-4,01,000. It was going through 4,002,000 records to give me the next chunk and so forth.. No wonder the query was getting slower and slower!

Let’s fix that:

7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  def perform(worker_number, chunk_size, current_count = 0)
    file_data = File.open("/tmp/worker_#{worker_number}_output#{current_count}.txt", 'w')
 
 
    low = total_num_of_workers.to_i*chunk_size*current_count+worker_number*chunk_size
    high = total_num_of_workers.to_i*chunk_size*current_count+worker_number*chunk_size+chunk_size
    dataset = DB[:work_table].select_all(:keys).left_outer_join(:output_table, :key_id => :id).where{(Sequel.qualify(:work_table,:id) >= low)}.where{Sequel.qualify(:work_table,:id) < high}
    # debugging
    puts dataset.sql
    output_table = DB[:output_table]
    dataset.each do |row|
      result = # Process the row data here
 
      file_data.puts "#{row[:id]},#{result}"
    end
 
    file_data.close
 
    tmp_keys.load_csv_infile("/tmp/worker_#{worker_number}_output#{current_count}.txt", [ :key_id, :data ])
 
 
    ## TODO: it'd be nice if we clean up the tmp directory when we're done.
 
    last_auto_increment_id = DB["SELECT `AUTO_INCREMENT` FROM  INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'my_db' AND TABLE_NAME = 'work_table'"].first[:AUTO_INCREMENT]
    if total_num_of_workers.to_i*chunk_size*current_count < last_auto_increment_id
      puts "spawn worker to do more work again."
 
      ProcessWorker.perform_async worker_number, chunk_size, current_count+1
    else
      start_time_raw=Sidekiq.redis {|conn| conn.get('validate_timer') }
      start_time=Time.at(start_time_raw.to_f)
      end_time = Time.now
 
      puts "Worker ##{worker_number} reports validate job took #{(end_time - start_time)*1000} milliseconds"
    end
  end
end

We are now scanning the table by primary ID from 1 to the AUTO_INCREMENT counter, so we are guaranteed to get all the rows. This SELECT method worked fast! If there was no data, the query returned back almost instantaneously. I had some gaps in my table from deletes, so it was really critical that this query returned back fast if it had no results. Overall, I probably lost a few trivial seconds skipping over deleted IDs. Let’s run our benchmark again:

Worker #9 reports validate job took 208492.26823838233 milliseconds

Wow! We’re able to process through 8.5 million records within FOUR MINUTES! That is certainly fast enough for me to be working with this database on a regular basis.

Applying What We Learned to UPDATEs

It is unfortunate that we can’t use LOAD FILE INLINE for updates, but let us see if we can apply the primary key scan and parallelization techniques for UPDATE-ing our existing output_table that we created.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
require 'sidekiq'
require 'sequel'
 
class ProcessUpdateWorker
  include Sidekiq::Worker
 
  def total_num_of_workers
    Sidekiq.redis {|conn| conn.get('process_update_workers') }
  end
 
 
  def perform(worker_number, chunk_size, current_count = 0)
    DB.run('SET autocommit=0;')
    DB.run('SET unique_checks=0;')
    DB.run('SET foreign_key_checks=0;')
 
 
    DB.transaction do
      low = total_num_of_workers.to_i*chunk_size*current_count+worker_number*chunk_size
      high = total_num_of_workers.to_i*chunk_size*current_count+worker_number*chunk_size+chunk_size
 
      dataset = DB[:work_table].select_all(:work_table).select_append(:data).left_outer_join(:output_table, :key_id => :id).where(:result2 => nil).where{(Sequel.qualify(:keys,:id) >= low)}.where{Sequel.qualify(:keys,:id) < high}
 
      output_table = DB[:output_table]
      puts dataset.sql
      dataset.each do |row|
        result2 = # process row here
        output_table.where("fk_id= ?", row[:id]).update(:result2 => "#{result2}")
      end
 
      last_auto_increment_id = DB["SELECT `AUTO_INCREMENT` FROM  INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'my_db' AND TABLE_NAME = 'work_table'"].first[:AUTO_INCREMENT]
      if total_num_of_workers.to_i*chunk_size*current_count < last_auto_increment_id
        puts "spawn worker to do more work again."
        ProcessUpdateWorker.perform_async worker_number, chunk_size, current_count+1
      else
        start_time=Time.at(Sidekiq.redis {|conn| conn.get('validate_timer') }.to_f)
        end_time = Time.now
 
        puts "Worker ##{worker_number} reports job took #{(end_time - start_time)*1000} milliseconds"
      end
    end
 
    DB.run('SET autocommit=0;')
    DB.run('SET unique_checks=0;')
    DB.run('SET foreign_key_checks=0;')
 
  end
end

For this job, I added the recommended turning auto commit, unique_checks, and foreign_key_checks off, but with my database, I don’t think I saw any improvements. I don’t think I really was using unique or foreign keys too much.

Anyways, after running this job, I was able to make 8.5 million updates in 26 minutes:

Worker #6 reports validate job took 1557370.9786546987 milliseconds

Not that bad, but could we make this faster??

LOAD DATA INFILE to temp table + UPDATE by JOIN

Doing more research, I saw that you can UPDATE one column from a table from another table via a join. Gave me the idea about loading the column I wanted to change into a tmp table and then overwriting the column I want to update. Would it work?

Job Invoker:

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  def bulk_insert_then_update( num_workers = 10)
    Sidekiq.redis {|conn| conn.set('bulk_insert_then_update_num_workers', num_workers.to_i)}
    Sidekiq.redis {|conn| conn.del('worker_complete_count')}
    puts 'starting bulk insert than update job'
 
    Sidekiq.redis {|conn| conn.set('bulk_insert_then_update_timer', Time.now.to_f) }
    DB.create_table! :tmp_table do
      primary_key :id
      foreign_key :fk_id, :keys
      String :result2
    end
    num_workers.times do | worker_number |
      BulkInsertThanUpdate.perform_async worker_number, 20000
    end
  end

Worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
require 'sidekiq'
require 'sequel'
 
class BulkInsertThanUpdate
  include Sidekiq::Worker
 
  def total_num_of_workers
    Sidekiq.redis {|conn| conn.get('bulk_insert_then_update_num_workers') }
  end
 
  def perform(worker_number, chunk_size, current_count = 0)
    file_data = File.open("/tmp/worker_#{worker_number}_output#{current_count}.txt", 'w')
 
    low = total_num_of_workers.to_i*chunk_size*current_count+worker_number*chunk_size
    high = total_num_of_workers.to_i*chunk_size*current_count+worker_number*chunk_size+chunk_size
    dataset = DB[:work_table].select_all(:keys).select_append(Sequel.qualify(:output_table,:result)).left_outer_join(:output_table, :key_id => :id).where(Sequel.qualify(:output_table,:result2) => nil).where{(Sequel.qualify(:work_table,:id) >= low)}.where{Sequel.qualify(:work_table,:id) < high}
    puts dataset.sql
    tmp_table = DB[:tmp_table]
    dataset.each do |row|
      result2 = # calculate results from row here
      file_data.puts "#{row[:id]},#{result2}"
    end
 
    file_data.close
 
    tmp_table.load_csv_infile("/tmp/worker_#{worker_number}_output#{current_count}.txt", [ :key_id, :result2 ])
 
 
    ## TODO: it'd be nice if we clean up the tmp directory when we're done.
 
    last_auto_increment_id = DB["SELECT `AUTO_INCREMENT` FROM  INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'my_db' AND TABLE_NAME = 'work_table'"].first[:AUTO_INCREMENT]
    if total_num_of_workers.to_i*chunk_size*current_count < last_auto_increment_id
      puts "spawn worker to do more work again."
 
      BulkInsertThanUpdate.perform_async worker_number, chunk_size, current_count+1
    else
      complete_worker_count = Sidekiq.redis {|conn| conn.incr('worker_complete_count') }
 
      if complete_worker_count.to_i == total_num_of_workers.to_i
        puts "all workers complete - running bulk sql update command..."
        DB.run('UPDATE output_table,tmp_table SET output_table.result2 = tmp_table.result2 WHERE output_table.fk_id = tmp_table.fk_id;')
        DB.run('DROP TABLE tmp_table')
      end
      start_time_raw=Sidekiq.redis {|conn| conn.get('bulk_insert_then_update_timer') }
      start_time=Time.at(start_time_raw.to_f)
      end_time = Time.now
 
      puts "Worker ##{worker_number} reports validate job took #{(end_time - start_time)*1000} milliseconds"
    end
  end
end

My strategy here was to do the exact same thing as LOAD DATA INFILE, then when all the workers were done load the data from the tmp_table into the output_table. I used the last worker that finished running to run the query by keeping a count of all the workers complete.

What’s the runtime?

Worker #7 reports validate job took 394662.35620292247 milliseconds

We got updates down to under 7 minutes!

Final Optimization Notes

To tune the system, you want to be paying close attention to the cpu time and the iowait time which you can see with top. If the iowait time is high, consider lowering the chunk size to work on smaller chunks at a time. I was able to get my iowait time to stay under 5%. If the CPU isn’t fully loaded, feel free to up the number of workers.

For my system:

i7 3770 3.4Ghz
16GB RAM
256GB Samsung 850 Pro
Mysql 5.5

I ended up with 20 workers and a 20000 chunk size (rows selected at once). When I tried increasing the number of workers, it actually had a negative effect on the benchmark speed, so there is a max that is beneficial. If you have more RAM, I would also consider tuning mysql server and even trying to have the whole DB buffered in memory (innodb_buffer_pool_size). Mysqltuner is a good resource as well as checking dba.stackexchange.com.

TL;DR

  • Parallelize the work with sidekiq – make full use of modern multi-core PCs!
  • Use LOAD DATA INFILE over INSERT statements
  • Scan table by the primary key with WHERE id BETWEEN start AND end rather than use LIMIT/OFFSET
  • Large updates can be made by loading data to a tmp_table with LOAD DATA INFILE and then updating the column via a join with the tmp_table
  • Watch your CPU usage and IOWAIT time and optimize such that your CPU utilization is high and your IOWAIT time is low.

Finally:

8.5 million rows can be SELECT’d and INSERT’d within 3.5 minutes. 1.5 minutes.
8.5 million rows can be SELECT’d and UPDATE’d within 7 minutes. 4.5 minutes.

**Update** I made it even faster by learning how to truly parallelize sidekiq!

I would love to hear if you know of better techniques that is even faster than this!

Site Launch!

Welcome to JMC Computer Consulting. I have been wanting to put a pretty website storefront to my consulting work for years, but I never had the opportunity. Mostly because building my own site would be as much work as working for a client – but without the pay! Fortunately though, in the past year, WordPress has advanced to the stage where using it for more than a blog is really feasible. The ease of management WordPress provides is wonderful! After using it for a few sites and seeing how nice it was, I took the dive and built this site. :-)

What can you expect from this blog?

Well, I already have my geeky system administrator blog over at bashusr.com, so you shouldn’t expect me to repeat much of that here. I plan to use this blog mainly for sharing my knowledge on programming and software development. The rest of the site is for attracting clients! I hope this will certainly increase my job opportunities!

Stay tuned, and thanks for visiting!

Jonathan