Test a ftp data pipeline with pytest & docker
System integration tests are difficult to write, difficult to run in a continuous integration platform.
That’s behind us. I will convince you that they are easier to write and run in a continuous integration platform than you might think. We are going to write together the system integration tests of a data pipeline that downloads data from open weather and deposits it on an ftp server.
Fixtup is an open source python library for testing a python application beyond code boundaries. It manages the creation of disposable environments and is compatible with all modern CI platforms that support docker-compose such as gitlab ci, github action, etc… The library works with pytest and unittest and runs on linux, macOS and windows.
At the end of this article, you will know how to test a data pipeline in python with pytest and fixtup, how to write system integration tests which verify the behavior of your code on the external resources and will reduce the apprehension of the next developer who will have to upgrade your stack …
Ftp server still here in 2022
The integration of data to an ftp server is omnipresent in the industry, even in 2022. An FTP server is an easy to operate, robust and easy to deploy solution for sharing data. Everyone knows how to connect, read and write to an FTP server.
No standard has managed to relegate it to the closet. The FTP protocol works rather in synergy with its replacements, for example cloud storage solutions like AWS S3, Azure Blob storage or Google Cloud Storage.
In this article, the data pipeline will be an ETL that regularly publishes a dataset to an ftp server.
Write a data pipeline
This data pipeline retrieves wind and humidity every minute from the Open Weather API for the city of Lyon in France and publishes them every 10 minutes on an ftp in the form of a csv file.
ok | wind_speed | humidity | ||
---|---|---|---|---|
2022-03-11T12:01 | 1 | 15 | 0.1 | |
2022-03-11T12:02 | 1 | 13 | 4 | |
… | ||||
2022-03-11T12:08 | 0 | |||
2022-03-11T12:09 | 1 | 4 | 4.1 | |
2022-03-11T12:09 | 1 | 4 | 4.1 |
It exports a csv file every 10 minutes. It writes the file with a name in the form file-2022-03-11T12-10.csv
for the increment between 03/11/2022 between 12:01 and 12:10. A file does not change once it has been written.
Test the data producer with a real ftp server
System integration tests validate the behavior of a module with an external system and can check the non-regression of the library over time.
These tests offer guarantees when the version of python evolves, or when the ftputil library needs to be updated. It is safe to say that simple write operations on an external system are validated on a real ftp server by the continuous integration platform.
Test the data producer with pytest and fixtup
Fixtup uses docker to mount disposable environments. It build a complete environment in one line in your test and can debug one or more tests in pycharm or vscode without worrying about having to start the environment.
def test_that_mount_a_ftp_server():
with fixtup.up('ftp_server'):
# Assign
# Acts
# Assert
pass
Write a dataset on the FTP server
Our ETL code downloads data from openweather every minute and writes it every 10 minutes to an ftp server.
Here is a code extract from the ETL. You will find the complete code in the example of fixtup repo.
@dataclasses.dataclass
class OpenWeatherDataset:
records: List[Tuple[datetime, Optional[dict]]] = dataclasses.field(default_factory=list)
def enqueue(self, time: datetime, record: dict):
self.records.append((time, record))
def pop(self) -> List[Tuple[datetime, dict]]:
records = self.records
self.records = []
return records
def pipeline_weather_publication(dataset: OpenWeatherDataset):
records = dataset.pop()
ftp_host = os.getenv('FTP_HOST', 'localhost')
ftp_user = os.getenv('FTP_USER', None)
ftp_pwd = os.getenv('FTP_PASS', None)
fieldnames = ['timestamp', 'measure_timestamp', 'ok', 'wind_speed', 'humidity']
content = []
for timestamp, record in records:
row = {}
row['timestamp'] = timestamp.isoformat()
if record is not None:
row['ok'] = 1
wind_section = record.get('wind', {})
row['wind_speed'] = wind_section.get('speed', None)
row['measure_timestamp'] = record.get('dt', 0)
main_section = record.get('main', {})
row['humidity'] = main_section.get("humidity")
else:
row['ok'] = 0
content.append(row)
now = datetime.utcnow()
file_timestamp = now - timedelta(minutes=now.minute % 10, microseconds=now.microsecond)
filename = f"file-{file_timestamp.strftime('%Y-%m-%dT%H-%M')}.csv"
with tempfile.TemporaryFile('a+') as filep:
csv_file = csv.DictWriter(filep, fieldnames)
csv_file.writeheader()
for row in content:
csv_file.writerow(row)
filep.seek(0)
with ftputil.FTPHost(host=ftp_host, user=ftp_user, passwd=ftp_pwd) as host:
with host.open(filename, 'w') as ftp_filep:
ftp_filep.write(filep.read())
test writing a dataset on the ftp
The integration tests will allow us to test that the function responsible for publishing the data on an ftp server writes a dataset in csv format with the expected name format.
def test_pipeline_weather_publication_should_export_a_dataset_on_a_ftp():
with fixtup.up('ftp_server'):
# Assign
dataset = OpenWeatherDataset()
dataset.enqueue(datetime(2022, 11, 20, 12, 5), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
dataset.enqueue(datetime(2022, 11, 20, 12, 6), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
dataset.enqueue(datetime(2022, 11, 20, 12, 7), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
dataset.enqueue(datetime(2022, 11, 20, 12, 8), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
dataset.enqueue(datetime(2022, 11, 20, 12, 9), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
dataset.enqueue(datetime(2022, 11, 20, 12, 10), {'wind': {'speed': 15}, 'main': {'humidity': 3}})
# Acts
pipeline_weather_publication(dataset)
# Assert
with ftputil.FTPHost(host=os.getenv('FTP_HOST', 'localhost'), user=os.getenv('FTP_USER', None), passwd=os.getenv('FTP_PASS', None)) as host:
names = host.listdir(host.curdir)
assert len(names) == 1
assert 'file-2022-11-20T12-10.csv' in names
When the test runtime enters the fixtup.up
instruction, it mounts the docker-compose stack associated with the ftp_server fixture. Fixtup will keep this container up and stop it when all tests have finished running.
Reset ftp content between each test
Between each test, it is better to start with an environment in an identical state. We need to clean the ftp server between each test. Fixtup allows us to write hooks in python for that. The hook_setup_data.py
hook runs at the start of each test.
tests/fixtures/ftp_server/.hooks/hook_setup_data.py
import os
from typing import Optional
import ftputil
from ftputil import FTPHost
def ensure_ftp_is_empty(ftp: FTPHost, path: Optional[str]):
list_doc = ftp.listdir(path)
for name in list_doc:
if name != "." and name != "..":
if ftp.path.isfile(name):
ftp.remove(name)
else:
ftp.rmtree(name)
with ftputil.FTPHost(host=os.getenv('FTP_HOST', 'localhost'), user=os.getenv('FTP_USER', None), passwd=os.getenv('FTP_PASS', None)) as ftp:
ensure_ftp_is_empty(ftp, ftp.curdir)
In a hook we can also arrange to restore files/folders at the start of each test to start with an already full ftp space.
Fixtup is a complete toolkit to write system integration tests
Upgrading a system without system integration testing is like trying to get a refund for a product without a receipt. It is frustrating and difficult.
Integration testing is a complement to unit testing. They do not replace them. Unit tests validate the behavior of a system up to 100 times faster but are confined to the code level. Fixtup allows you to write tests that can go beyond this boundary.
Fixtup offers tools to control docker containers on which several tests depend and make their execution easier, whether in debug on the developer workstation or in a continuous integration platform such as gitlab ci, github action, etc.
Are you using containers in your CI platform to run system integration tests ? On which technical stack do you think you would need to write system integration tests ? Let us know in the comments below!