Tuesday, March 25 2014

Load data into PostgreSQL. Fast.

The pgloader software is made with a single goal in mind: load data from any possible source as fast as possible.

Currently pgloader supports data from plain files in the CSV format and the Fixed Column Width Format, and also database alike data sources such as dBase III files and SQL files; and pgloader even supports full migration from a live MySQL database.

A first example

In all those cases the mode of operation is about the same: you prepare a command file that looks like the following then run the command pgloader csv-districts.load to start the data loading.

/*
 * The data file comes from the US census website:
 *
 * http://www.census.gov/geo/maps-data/data/gazetteer2013.html
 *
 * We import it directly into pgloader git repository so that we have at
 * least a CSV test where we read from a local file...
 */

LOAD CSV
     FROM data/2013_Gaz_113CDs_national.txt
      (
         usps,          -- United States Postal Service State Abbreviation
         geoid,         -- Geographic Identifier
         aland,         -- Land Area (square meters)
         awater,        -- Water Area (square meters)
         aland_sqmi,    -- SQMI	 Land Area (square miles)
         awater_sqmi,   -- SQMI	 Water Area (square miles)
         intptlat,      -- Latitude (decimal degrees)
         intptlong      -- Longitude (decimal degrees)
      )

     INTO postgresql:///pgloader?districts
      (
         usps, geoid, aland, awater, aland_sqmi, awater_sqmi,
         location point using (format nil "(~a,~a)" intptlong intptlat)
      )

     WITH truncate,
          skip header = 1,
          fields terminated by '\t'

   BEFORE LOAD DO
    $$ drop table if exists districts; $$,
    $$ create table districts (
         usps        text,
         geoid       text,
         aland       bigint,
         awater      bigint,
         aland_sqmi  double precision,
         awater_sqmi double precision,
         location    point
       );
    $$;

Other examples and full documentation reference

The main pgloader website provides examples to load data from all supported data sources and also provides the full pgloader reference documentation, where all command clauses are reviewed in the context of the kind of data source you want to import.

Main features

pgloader main features are as follow:

  • parallel reading and writing of the data, for performances,
  • ability to transform the data while reading it,
  • the set of fields read and columns written can be different,
  • the transformation happens within the data pipeline,
  • support for user provided transformation routines,
  • the routines are compiled to machine code before running them,
  • good error handling, allowing to process good and bad rows from data sources,
  • good flexibilty of source format support,
  • comes with support for MySQL, SQLite, dBase, CSV, Fixed files.

Using pgloader from sources or binary

Compiling pgloader from sources is quite easy as it's been entirely automated, if you're used to using Makefiles you should quickly find your way through it. pgloader relies on a recent enough version of SBCL to run, and can be entirely downloaded (with dependencies) using the awesome Quicklisp Common Lisp libraries manager.

It's also possible to directly fetch an already compiled binary package of pgloader, currently such a facility is only provided for debian and CentOS.

Note that if you're already used to program in Common Lisp with e.g. SLIME (or anything else really) then you should be fine with just entering (ql:quickload "pgloader") at your favorite REPL, then you can play with pgloader sources directly like this:

CL-USER> (pgloader:run-commands "~/dev/pgloader/test/csv-districts.load" 
                                :client-min-messages :warning)
2014-03-25T21:34:21.002000+01:00 LOG Starting pgloader, log system is ready.
2014-03-25T21:34:21.003000+01:00 LOG Parsing commands from file #P"/Users/dim/dev/pgloader/test/csv-districts.load"

                    table name       read   imported     errors            time
------------------------------  ---------  ---------  ---------  --------------
                   before load          2          2          0          0.080s
------------------------------  ---------  ---------  ---------  --------------
                     districts        440        440          0          0.079s
------------------------------  ---------  ---------  ---------  --------------
             Total import time        440        440          0          0.159s
NIL

If you want to discover the internal APIs of pgloader, maybe the best way is to see the code it generates from the commands file:

CL-USER> (pgloader::with-monitor ()
           (pgloader.parser::parse-commands-from-file "/Users/dim/dev/pgloader/test/csv-districts.load"))
2014-03-25T21:36:20.000000+01:00 LOG Starting pgloader, log system is ready.
2014-03-25T21:36:20.002000+01:00 LOG Parsing commands from file #P"/Users/dim/dev/pgloader/test/csv-districts.load"
((LAMBDA ()
   (LET* ((PGLOADER.PARSER::STATE-BEFORE (PGLOADER.UTILS:MAKE-PGSTATE))
          (PGLOADER.PARSER::SUMMARY (NULL PGLOADER.PARAMS:*STATE*))
          (PGLOADER.PARAMS:*STATE*
           (OR PGLOADER.PARAMS:*STATE* (PGLOADER.UTILS:MAKE-PGSTATE)))
          (PGLOADER.PARSER::STATE-AFTER NIL)
          (PGLOADER.PARAMS:*PGCONN-HOST* ':UNIX)
          (PGLOADER.PARAMS:*PGCONN-PORT* 54393)
          (PGLOADER.PARAMS:*PGCONN-USER* "dim")
          (PGLOADER.PARAMS:*PGCONN-PASS* NIL)
          (PGLOADER.PARAMS:*PG-DBNAME* "pgloader")
          (PGLOADER.PARAMS:*PG-SETTINGS* 'NIL))
     (PROGN
      (PGLOADER.UTILS:WITH-STATS-COLLECTION ("before load" :DBNAME "pgloader"
                                             :STATE
                                             PGLOADER.PARSER::STATE-BEFORE
                                             :USE-RESULT-AS-READ T
                                             :USE-RESULT-AS-ROWS T)
        (PGLOADER.PGSQL:WITH-PGSQL-TRANSACTION (:DBNAME "pgloader")
          (LOOP PGLOADER.PARSER::FOR PGLOADER.PARSER::COMMAND PGLOADER.PARSER::IN '("drop table if exists districts;"
                                                                                    "create table districts (
         usps        text,
         geoid       text,
         aland       bigint,
         awater      bigint,
         aland_sqmi  double precision,
         awater_sqmi double precision,
         location    point
       );")
                DO (PGLOADER.MONITOR:LOG-MESSAGE :NOTICE
                                                 PGLOADER.PARSER::COMMAND) (PGLOADER.PGSQL:PGSQL-EXECUTE
                                                                            PGLOADER.PARSER::COMMAND
                                                                            :CLIENT-MIN-MESSAGES
                                                                            :ERROR)
                PGLOADER.PARSER::COUNTING PGLOADER.PARSER::COMMAND)))
      (LET ((TRUNCATE
             (GETF '(:TRUNCATE T :SKIP-LINES 1 :SEPARATOR #\Tab) :TRUNCATE))
            (PGLOADER.PARSER::SOURCE
             (MAKE-INSTANCE 'PGLOADER.CSV:COPY-CSV :TARGET-DB "pgloader"
                            :SOURCE
                            '(:FILENAME
                              #P"/Users/dim/dev/pgloader/test/data/2013_Gaz_113CDs_national.txt")
                            :TARGET "districts" :ENCODING :UTF-8 :FIELDS
                            '(("usps") ("geoid") ("aland") ("awater")
                              ("aland_sqmi") ("awater_sqmi") ("intptlat")
                              ("intptlong"))
                            :COLUMNS
                            '(("usps" NIL NIL) ("geoid" NIL NIL)
                              ("aland" NIL NIL) ("awater" NIL NIL)
                              ("aland_sqmi" NIL NIL) ("awater_sqmi" NIL NIL)
                              ("location" "point"
                               (FORMAT NIL "(~a,~a)"
                                       PGLOADER.TRANSFORMS::INTPTLONG
                                       PGLOADER.TRANSFORMS::INTPTLAT)))
                            :SKIP-LINES 1 :SEPARATOR #\Tab)))
        (PGLOADER.SOURCES:COPY-FROM PGLOADER.PARSER::SOURCE :TRUNCATE
                                    TRUNCATE))
      NIL
      (WHEN PGLOADER.PARSER::SUMMARY
        (PGLOADER.UTILS:REPORT-FULL-SUMMARY "Total import time"
                                            PGLOADER.PARAMS:*STATE* :BEFORE
                                            PGLOADER.PARSER::STATE-BEFORE
                                            :FINALLY
                                            PGLOADER.PARSER::STATE-AFTER))))))

Attention has been payed all over the source code to provide enough source level documentation, open an issue on the pgloader github project if you need more details.

Performances and Program Architecture

The pgloader program is using a pair of threads and a communication queue to perform its data loading.

The first thread read the data from the source (either a file or a connection string) and prepares batches (a pre-allocated vector of fixed size) preprocessed (converted) rows. As soon as the *copy-batch-rows* threshold is reached, the batch vector is made available to the other thread.

The second thread reads the data from the queue and writes it down to the PostgreSQL connection that has been set in the COPY streaming protocol. When all the rows from the current batch have been sent out to PostgreSQL, the COPY command is closed, the current transaction is commited, and we start again.

Data error handling

We might have errors either from reading the data at the source, or when pushing the data to PostgreSQL.

Handling Source reading errors

The error handling capabilities depend on what the different librairies of code used for reading are, and to some extends to the kind or error. When possible, pgloader will make an effort to log the error and continue loading.

In cases when it's not possible, the error is logged and the loading stops with an (hopefully) informative message about the problem.

Handling PostgreSQL errors

Some data will get rejected by PostgreSQL, even after being carefully prepared by the transformation functions you can attach to pgloader. Then pgloader parses the PostgreSQL CONTEXT error message that contains the line number in the batch of where the error did happen.

It's then easy enough to resend the all the rows from the batch that are located before the error, skip and log as rejected the faulty row, and continue, handling eventual next errors the same way.