pgloader, the PostgreSQL parallel ETL, in python

What does mean such a mouthful title? The easiest way to find out about this would be to check out the slides from the first PostgreSQL European Conference, held in Prato. Or to read the following two paragraphs:

The PostgreSQL parallel ETL, writen in python. This little piece of software is a layer atop the PostgreSQL COPY command. The difference between pgloader and plain COPY is the behavior in case of errors in the source data files and the range of input formats supported.

And this little fact that pgloader is able to parallelize its tasks, loading several file at once or even using more than one CPU to load a single source file.

Contributions

Any idea or comment is welcome, on the form of a simple mail or documentation or source code patch. Don't be shy!

As a matter of fact, several of the most advanced features of pgloader are in there only because some intrepid user asked for them. Join the fun and be one of them anytime soon.

Documentation

The fine manual of pgloader is online at the http://pgfoundry web site.

Examples

Check them out in your distribution of pgloader, or directly browse the github pgloader examples directory.

Tutorial

This blog features a pgloader articles series: How To Use PgLoader, How to Setup pgloader, parallel pgloader, pgloader reformating and pgloader constant cols. Here you will find them all as a single long tutorial page.

installing pgloader

Either use the debian package or the one for your distribution of choice if you use another one. RedHat, CentOS, FreeBSD, OpenBSD and some more already include a binary package that you can use directly.

Or you could git clone https://github.com/dimitri/pgloader.git and go from there. As it's all python code, it runs fine interpreted from the source directory, you don't need to install it in a special place in your system.

setting up the test environment

To use them, please first create a pgloader database, then for each example the tables it needs, then issue the pgloader command:

$ createdb --encoding=utf-8 pgloader
$ cd examples
$ psql pgloader < simple/simple.sql
$ ../pgloader.py -Tvc pgloader.conf simple

If you want to load data from all examples, create tables for all of them first, then run pgloader without argument.

running the import

You can launch all those pgloader tests in one run, provided you created the necessary tables:

 $ for sql in */*sql; do psql pgloader < $sql; done
 $ ../pgloader.py -Tsc pgloader.conf

  errors       WARNING  COPY error, trying to find on which line
  errors       WARNING  COPY data buffer saved in /tmp/errors.AhWvAv.pgloader
  errors       WARNING  COPY error recovery done (2/3) in 0.064s
  errors       WARNING  COPY error, trying to find on which line
  errors       WARNING  COPY data buffer saved in /tmp/errors.BclHtj.pgloader
  errors       WARNING  COPY error recovery done (1/1) in 0.054s
  errors       ERROR    3 errors found into [errors] data
  errors       ERROR    please read /tmp/errors.rej.log for errors log
  errors       ERROR    and /tmp/errors.rej for data still to process
  errors       ERROR    3 database errors occured
  reformat     WARNING  COPY error, trying to find on which line
  reformat     WARNING  COPY data buffer saved in /tmp/reformat.6P4WCD.pgloader
  reformat     WARNING  COPY error recovery done (1/4) in 0.034s
  reformat     ERROR    1 errors found into [reformat] data
  reformat     ERROR    please read /tmp/reformat.rej.log for errors log
  reformat     ERROR    and /tmp/reformat.rej for data still to process
  reformat     ERROR    1 database errors occured

  Table name        |    duration |    size |  copy rows |     errors
  ====================================================================
  allcols           |      0.025s |       - |          8 |          0
  clob              |      0.034s |       - |          7 |          0
  cluttered         |      0.061s |       - |          6 |          0
  csv               |      0.035s |       - |          6 |          0
  errors            |      0.113s |       - |          4 |          3
  fixed             |      0.045s |       - |          3 |          0
  partial           |      0.030s |       - |          7 |          0
  reformat          |      0.036s |       - |          4 |          1
  serial            |      0.029s |       - |          7 |          0
  simple            |      0.050s |       - |          7 |          0
  udc               |      0.020s |       - |          5 |          0
  ====================================================================
  Total             |      0.367s |       - |         64 |          4

Please note errors test should return 3 errors and reformat 1 error.

Setting up pgloader.conf

This file is expected in the INI format, with a global section then one section per file you want to import. The global section defines some default options and how to connect to the PostgreSQL server.

The configuration setup is fully documented on the pgloader man page that you can even easily find online. As all unix style man pages, though, it's more a complete reference than introductory material. Let's review.

global section

Here's the global section of the examples/pgloader.conf file of the source files. Well, some options are debugger only options, really, so I changed their value so that what you see here is a better starting point.

[pgsql]
base = pgloader

log_file            = /tmp/pgloader.log
log_min_messages    = INFO
client_min_messages = WARNING

lc_messages         = C
pg_option_client_encoding = 'utf-8'
pg_option_standard_conforming_strings = on
pg_option_work_mem = 128MB

copy_every      = 15000

null         = ""
empty_string = "\ "

max_parallel_sections = 4

You don't see all the connection setup, here base was enough. You might need to setup host, port and user, and maybe even pass, too, to be able to connect to the PostgreSQL server.

The logging options allows you to set a file where to log all pgloader messages, that are categorized as either DEBUG, INFO, WARNING, ERROR or CRITICAL. The options log_min_messages and client_min_messages are another good idea stolen from PostgreSQL and allow you to setup the level of chatter you want to see on the interactive console (standard output and standard error streams) and on the log file.

Please note that the DEBUG level will produce more that 3 times as many data as the data file you're importing. If you're not a pgloader contributor or helping them, well, debug it, you want to avoid setting the log chatter to this value.

The client_encoding will be SET by pgloader on the PostgreSQL connection it establish. You can now even set any parameter you want by using the pg_option_parameter_name magic settings. Note that the command line option --pg-options (or -o for brevity) allows you to override that.

Then, the copy_every parameter is set to 5 in the examples, because the test files are containing less than 10 lines and we want to test several batches of commits when using them. So for your real loading, stick to default parameters (10 000 lines per COPY command), or more. You can play with this parameter, depending on the network (or local access) and disk system you're using you might see improvements by reducing it or enlarging it. There's no so much theory of operation as empirical testing and setting here. For a one-off operation, just remove the lines from the configuration.

The parameters null and empty_string are related to interpreting the data in the text or csv files you have, and the documentation is quite clear about them. Note that you have global setting and per-section setting too.

The last parameter of this example, max_parallel_sections, is detailed later in the article.

files section

After the global section come as many sections as you have file to load. Plus the template sections, that are only there so that you can share a bunch of parameters in more than one section. Picture a series of data file all of the same format, the only thing that will change is the filename. Use a template section in this case!

Let's see an example:

[simple_tmpl]
template     = True
format       = text
datestyle    = dmy
field_sep    = |
trailing_sep = True

[simple]
use_template    = simple_tmpl
table           = simple
filename        = simple/simple.data
columns         = a:1, b:3, c:2
skip_head_lines = 2

# those reject settings are defaults one
reject_log   = /tmp/simple.rej.log
reject_data  = /tmp/simple.rej

[partial]
table        = partial
format       = text
filename     = partial/partial.data
field_sep    = %
columns      = *
only_cols    = 1-3, 5

That's 2 of the examples from the examples/pgloader.conf file, in 3 sections so that we see one template example. Of course, having a single section using the template, it's just here for the example.

data file format

The most important setting that you have to care about is the file format. Your choice here is either text, csv or fixed. Mostly, what we are given nowadays is csv. You might remember having read that the nice thing about standards is that there's so many to choose from... well, the csv land is one where it's pretty hard to find different producers that understand it the same way.

So when you fail to have pgloader load your mostly csv files with a csv setup, it's time to consider using text instead. The text file format accept a lot of tunables to adapt to crazy situations, but is all python code when the python csv module is a C-coded module, more efficient.

If you're wondering what kind of format we're talking about here, here's the cluttered pgloader example for your reading pleasure, using ^ (carret) as the field separator:

1^some multi\
line text with\
newline escaping^and some other data following^
2^and another line^clean^
3^and\
a last multiline\
escaped line
with a missing\
escaping^just to test^
4^\ ^empty value^
5^^null value^
6^multi line\
escaped value\
\
with empty line\
embeded^last line^

And here's what we get by loading that:

pgloader/examples$ pgloader -c pgloader.conf -s cluttered
Table name        |    duration |    size |  copy rows |     errors
====================================================================
cluttered         |      0.193s |       - |          6 |          0

pgloader/examples$ psql pgloader -c "table cluttered;"
 a |               b               |        c
---+-------------------------------+------------------
 1 | and some other data following | some multi
                                   : line text with
                                   : newline escaping
 2 | clean                         | and another line
 3 | just to test                  | and
                                   : a last multiline
                                   : escaped line
                                   : with a missing
                                   : escaping
 4 | empty value                   |
 5 | null value                    |
 6 | last line                     | multi line
                                   : escaped value
                                   :
                                   : with empty line
                                   : embeded
(6 rows)

So when you have such kind of data, well, it might be that pgloader is still able to help you!

Please refer to the pgloader man page to know about each and every parameter that you can define and the values accepted, etc. And the fixed data format is to be used when you're not given a field separator but field positions in the file. Yes, we still encounter those from time to time. Who needs variable size storage, after all?

parallel pgloader

Parallelism is implemented in 3 different ways in pgloader.

several files at a time

First, you can load more than one file at a time thanks to the max_parallel_sections parameter, that has to be setup in the global section of the file.

This setting is quite simple and already allows the most common use case.

several workers per file

The other use case is when you have huge files to load into the database. Then you want to be able to have more than one process reading the file at the same time. Using pgloader, you already did the compromise to load the whole content in more than one transaction, so there's no further drawback here about having those multiple transactions per file spread to more than one load worker.

There are basically two ways to split the work between several workers here, and both are implemented in pgloader.

N workers, N splits of the file
section_threads    = 4
split_file_reading = True

Setup this way, pgloader will launch 4 different threads (see the caveat section of this article). Each thread is then given a part of the input data file and will run the whole usual pgloader processing on its own. For this to work you need to be able to seek in the input stream, which might not always be convenient.

one reader, N workers
section_threads    = 4
split_file_reading = False
rrqueue_size       = 5000

With such a setup, pgloader will start 4 different worker threads that will receive the data input in an internal python queue. Another active thread will be responsible of reading the input file and filling the queues in a round robin fashion, but will hand all the processing of the data to each worker, of course.

how many threads?

If you're using a mix and match of max_parallel_sections and section_threads with split_file_reading set to True of False, it's uneasy to know exactly how many threads will run at any time in the loading. How to ascertain which section will run in parallel when it depends on the timing of the loading?

The advice here is the usual one, don't overestimate the capabilities of your system unless you are in a position to check before by doing trial runs.

caveat

Current implementation of all the parallelism in pgloader has been done with the python threading API. While this is easy enough to use when you want to exchange data between threads, it's suffering from the Global Interpreter Lock issue. This means that while the code is doing its processing in parallel, the runtime not so much. You might still benefit from the current implementation if you have hard to parse files, or custom reformat modules that are part of the loading bottleneck.

future

The solution would be to switch to using the newer python multiprocessing API, and some preliminary work has been done in pgloader to allow for that. If you're interested in real parallel bulk loading, contact-me!

reformating data

Here's what the pgloader documentation has to say about this reformat parameter: The value of this option is a comma separated list of columns to rewrite, which are a colon separated list of column name, reformat module name, reformat function name.

And here's the examples/pgloader.conf section that deals with reformat:

[reformat]
table           = reformat
format          = text
filename        = reformat/reformat.data
field_sep       = |
columns         = id, timestamp
reformat        = timestamp:mysql:timestamp

The documentation says some more about it, so check it out. Also, the reformat_path option (set either on the command line or in the configuration file) is used to find the python module implementing the reformat function. Please refer to the manual as to how to set it.

Now, obviously, for the reformat to happen we need to write some code. That's the whole point of the option: you need something very specific, you are in a position to write the 5 lines of code needed to make it happen, pgloader allows you to just do that. Of course, the code needs to be written in python here, so that you can even benefit from the parallel pgloader settings.

Let's see an reformat module exemple, as found in reformat/mysql.py in the pgloader sources:

# Author: Dimitri Fontaine <dim@tapoueh.org>
#
# pgloader mysql reformating module
#

def timestamp(reject, input):
    """ Reformat str as a PostgreSQL timestamp

    MySQL timestamps are like:  20041002152952
    We want instead this input: 2004-10-02 15:29:52
    """
    if len(input) != 14:
        e = "MySQL timestamp reformat input too short: %s" % input
        reject.log(e, input)

    year    = input[0:4]
    month   = input[4:6]
    day     = input[6:8]
    hour    = input[8:10]
    minute  = input[10:12]
    seconds = input[12:14]

    return '%s-%s-%s %s:%s:%s' % (year, month, day, hour, minute, seconds)

This reformat module will transform a timestamp representation as issued by certain versions of MySQL into something that PostgreSQL is able to read as a timestamp.

If you're in the camp that wants to write as little code as possible rather than easy to read and maintain code, I guess you could write it this way instead:

import re
def timestamp(reject, input):
    """ 20041002152952 -> 2004-10-02 15:29:52 """
    g = re.match(r"(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})", input)
    return '%s-%s-%s %s:%s:%s' % tuple([g.group(x+1) for x in range(6)])

Whenever you have an input file with data that PostgreSQL chokes upon, you can solve this problem from pgloader itself: no need to resort to scripting and a pipelines of awk (which I use a lot in other cases, don't get me wrong) or other tools. See, you finally have an excuse to Dive into Python!

Adding constant cols

The basic situation where you need to do so is adding an origin field to your table. The value of that is not to be found in the data file itself, typically, but known in the pgloader setup. That could even be the filename you are importing data from.

In pgloader that's called a user defined column. Here's what the relevant examples/pgloader.conf setup looks like:

[udc]
table           = udc
format          = text
filename        = udc/udc.data
input_encoding  = 'latin1'
field_sep       = %
columns         = b:2, d:1, x:3, y:4
udc_c           = constant value
copy_columns    = b, c, d

And the data file is:

1%5%foo%bar
2%10%bar%toto
3%4%toto%titi
4%18%titi%baz
5%2%baz%foo

And here's what the loaded table looks like:

pgloader/examples$ pgloader -Tsc pgloader.conf udc
Table name        |    duration |    size |  copy rows |     errors
====================================================================
udc               |      0.201s |       - |          5 |          0

pgloader/examples$ psql --cluster 8.4/main pgloader -c "table udc"
 b  |       c        | d
----+----------------+---
  5 | constant value | 1
 10 | constant value | 2
  4 | constant value | 3
 18 | constant value | 4
  2 | constant value | 5
(5 rows)

Of course the configuration is not so straightforward as to process fields in the data file in the order that they appear, after all the examples/pgloader.conf are also a test suite.

Long story short: if you need to add some constant values into the target table you're loading data to, pgloader will help you there!

That's it

You've just reach the end of the pgloader tutorial, time to go import your data!