Over the flows

personal space about software engineering


Test a ftp data pipeline with pytest & docker

System integration tests are difficult to write, difficult to run in a continuous integration platform.

That’s behind us. I will convince you that they are easier to write and run in a continuous integration platform than you might think. We are going to write together the system integration tests of a data pipeline that downloads data from open weather and deposits it on an ftp server.

ETL Architecture

Fixtup is an open source python library for testing a python application beyond code boundaries. It manages the creation of disposable environments and is compatible with all modern CI platforms that support docker-compose such as gitlab ci, github action, etc… The library works with pytest and unittest and runs on linux, macOS and windows.

At the end of this article, you will know how to test a data pipeline in python with pytest and fixtup, how to write system integration tests which verify the behavior of your code on the external resources and will reduce the apprehension of the next developer who will have to upgrade your stack …

Ftp server still here in 2022

The integration of data to an ftp server is omnipresent in the industry, even in 2022. An FTP server is an easy to operate, robust and easy to deploy solution for sharing data. Everyone knows how to connect, read and write to an FTP server.

FTP Server still here

No standard has managed to relegate it to the closet. The FTP protocol works rather in synergy with its replacements, for example cloud storage solutions like AWS S3, Azure Blob storage or Google Cloud Storage.

In this article, the data pipeline will be an ETL that regularly publishes a dataset to an ftp server.

Write a data pipeline

This data pipeline retrieves wind and humidity every minute from the Open Weather API for the city of Lyon in France and publishes them every 10 minutes on an ftp in the form of a csv file.

ok wind_speed humidity
2022-03-11T12:01 1 15 0.1
2022-03-11T12:02 1 13 4
2022-03-11T12:08 0
2022-03-11T12:09 1 4 4.1
2022-03-11T12:09 1 4 4.1

It exports a csv file every 10 minutes. It writes the file with a name in the form file-2022-03-11T12-10.csv for the increment between 03/11/2022 between 12:01 and 12:10. A file does not change once it has been written.

Test the data producer with a real ftp server

System integration tests validate the behavior of a module with an external system and can check the non-regression of the library over time.

These tests offer guarantees when the version of python evolves, or when the ftputil library needs to be updated. It is safe to say that simple write operations on an external system are validated on a real ftp server by the continuous integration platform.

Image description

Test the data producer with pytest and fixtup

Fixtup uses docker to mount disposable environments. It build a complete environment in one line in your test and can debug one or more tests in pycharm or vscode without worrying about having to start the environment.

def test_that_mount_a_ftp_server():
    with fixtup.up('ftp_server'):
        # Assign
        # Acts
        # Assert
        pass

Write a dataset on the FTP server

Our ETL code downloads data from openweather every minute and writes it every 10 minutes to an ftp server.

ETL Architecture

Here is a code extract from the ETL. You will find the complete code in the example of fixtup repo.

@dataclasses.dataclass
class OpenWeatherDataset:
    records: List[Tuple[datetime, Optional[dict]]] = dataclasses.field(default_factory=list)

    def enqueue(self, time: datetime, record: dict):
        self.records.append((time, record))

    def pop(self) -> List[Tuple[datetime, dict]]:
        records = self.records
        self.records = []

        return records

def pipeline_weather_publication(dataset: OpenWeatherDataset):
    records = dataset.pop()

    ftp_host = os.getenv('FTP_HOST', 'localhost')
    ftp_user = os.getenv('FTP_USER', None)
    ftp_pwd = os.getenv('FTP_PASS', None)

    fieldnames = ['timestamp', 'measure_timestamp', 'ok', 'wind_speed', 'humidity']
    content = []
    for timestamp, record in records:
        row = {}
        row['timestamp'] = timestamp.isoformat()
        if record is not None:
            row['ok'] = 1
            wind_section = record.get('wind', {})
            row['wind_speed'] = wind_section.get('speed', None)
            row['measure_timestamp'] = record.get('dt', 0)
            main_section = record.get('main', {})
            row['humidity'] = main_section.get("humidity")
        else:
            row['ok'] = 0

        content.append(row)

    now = datetime.utcnow()
    file_timestamp = now - timedelta(minutes=now.minute % 10, microseconds=now.microsecond)
    filename = f"file-{file_timestamp.strftime('%Y-%m-%dT%H-%M')}.csv"

    with tempfile.TemporaryFile('a+') as filep:
        csv_file = csv.DictWriter(filep, fieldnames)
        csv_file.writeheader()
        for row in content:
            csv_file.writerow(row)

        filep.seek(0)

        with ftputil.FTPHost(host=ftp_host, user=ftp_user, passwd=ftp_pwd) as host:
            with host.open(filename, 'w') as ftp_filep:
                ftp_filep.write(filep.read())

test writing a dataset on the ftp

The integration tests will allow us to test that the function responsible for publishing the data on an ftp server writes a dataset in csv format with the expected name format.

Image description

def test_pipeline_weather_publication_should_export_a_dataset_on_a_ftp():
    with fixtup.up('ftp_server'):
        # Assign
        dataset = OpenWeatherDataset()
        dataset.enqueue(datetime(2022, 11, 20, 12, 5), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
        dataset.enqueue(datetime(2022, 11, 20, 12, 6), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
        dataset.enqueue(datetime(2022, 11, 20, 12, 7), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
        dataset.enqueue(datetime(2022, 11, 20, 12, 8), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
        dataset.enqueue(datetime(2022, 11, 20, 12, 9), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
        dataset.enqueue(datetime(2022, 11, 20, 12, 10), {'wind': {'speed': 15}, 'main': {'humidity': 3}})

        # Acts
        pipeline_weather_publication(dataset)

        # Assert
        with ftputil.FTPHost(host=os.getenv('FTP_HOST', 'localhost'), user=os.getenv('FTP_USER', None), passwd=os.getenv('FTP_PASS', None)) as host:
            names = host.listdir(host.curdir)
            assert len(names) == 1
            assert 'file-2022-11-20T12-10.csv' in names

When the test runtime enters the fixtup.up instruction, it mounts the docker-compose stack associated with the ftp_server fixture. Fixtup will keep this container up and stop it when all tests have finished running.

Image description

Reset ftp content between each test

Between each test, it is better to start with an environment in an identical state. We need to clean the ftp server between each test. Fixtup allows us to write hooks in python for that. The hook_setup_data.py hook runs at the start of each test.

Image description

tests/fixtures/ftp_server/.hooks/hook_setup_data.py

import os
from typing import Optional

import ftputil
from ftputil import FTPHost


def ensure_ftp_is_empty(ftp: FTPHost, path: Optional[str]):
    list_doc = ftp.listdir(path)

    for name in list_doc:
        if name != "." and name != "..":
            if ftp.path.isfile(name):
                ftp.remove(name)
            else:
                ftp.rmtree(name)


with ftputil.FTPHost(host=os.getenv('FTP_HOST', 'localhost'), user=os.getenv('FTP_USER', None), passwd=os.getenv('FTP_PASS', None)) as ftp:
    ensure_ftp_is_empty(ftp, ftp.curdir)

In a hook we can also arrange to restore files/folders at the start of each test to start with an already full ftp space.

Fixtup is a complete toolkit to write system integration tests

Upgrading a system without system integration testing is like trying to get a refund for a product without a receipt. It is frustrating and difficult.

Integration testing is a complement to unit testing. They do not replace them. Unit tests validate the behavior of a system up to 100 times faster but are confined to the code level. Fixtup allows you to write tests that can go beyond this boundary.

Image description

Fixtup offers tools to control docker containers on which several tests depend and make their execution easier, whether in debug on the developer workstation or in a continuous integration platform such as gitlab ci, github action, etc.

Are you using containers in your CI platform to run system integration tests ? On which technical stack do you think you would need to write system integration tests ? Let us know in the comments below!

To know more