Python Workflow Automation With Prefect (A Better Airflow) | 1
|

Python Workflow Automation With Prefect (A Better Airflow)

I was a big fan of Apache Airflow. Even today, I don’t have many complaints about it. But the new technology, Prefect, amazed me in many ways, and I can’t help but migrate everything to it.

Prefect (and Airflow) is a workflow automation tool. You can orchestrate individual tasks to do more complex work. You could manage task dependencies, retry tasks when they fail, schedule them, etc.

Workflow management is the backbone of every data science project. Even small projects can have remarkable benefits with a tool like Prefect. It eliminates a significant part of repetitive tasks. With Prefect you can start automating tasks inside your Jupyter notebook. You can’t do airflow tasks in Jupyter Notebooks.

Related: 11 Advantages of Cloud Databases Over On-Premise Databases.

This article covers some of the frequent questions about Prefect. It includes

  • a short intro to Prefect’s core concepts;
  • why I decided to migrate from Airflow;
  • Prefect’s incredible features and integration with other technologies and;
  • how to decide between its cloud vs. on-premise deployment options.

Quickstart Prefect.

Prefect is both a minimal and complete workflow management tool. Just a bunch of Python functions to organize, and it’s unbelievably simple to set up. Yet it can do everything tools such as Airflow can and more.

You can use PyPI, Conda, or Pipenv to install it, and it’s ready to rock. More on this in comparison with the Airflow section.

pip install prefect
# conda install -c conda-forge prefect
# pipenv install --pre prefect
Bash

Before we dive into using Prefect, let’s first see an unmanaged workflow. It makes understanding the role of Prefect in workflow management easy.

 

The below script queries an API (Extract—E), picks the relevant fields from it (Transform—T), and appends them to a file (Load—L). It contains three Python functions that perform each of the tasks mentioned. It’s a straightforward yet everyday use case of workflow management tools—ETL.

import requests

def extract() -> dict:
    """Use the Open Weather Map API to fetch Boston weather data.
    Returns:
        dict: a JSON response of many wheather measurements.
    """

    url = "https://api.openweathermap.org/data/2.5/weather"

    # TODO: Use a real API key. You can get a free one from https://openweathermap.org/
    response = requests.request(
        "GET", url, params={"q": "Boston", "appid": "e5ecbcc49e3debeead24d0fe012fb46e"}
    )

    return response.json()


def transform(response: dict) -> float:
    """Process the response and extract windspeed information
    Args:
        response (dict): Response JSON from extract task
    Returns:
        float: Current wind speed
    """
    return response.get("wind", {}).get("speed", 0.0)


def load(speed: float):
    """Append data to file
    Args:
        speed (float): Windspeed from transform task
    """

    with open("windspeed.txt", "a") as f:
        f.write(str(speed) + "\n")


if __name__ == "__main__":
    # Execute tasks in the right order.
    response = extract()
    windspeed = transform(response)
    load(windspeed)
Python

This script downloads weather data from the OpenWeatherMap API and stores the windspeed value in a file. ETL applications in real life could be complex. But this example application covers the fundamental aspects very well.

Note: Please replace the API key with a real one. You can get one from https://openweathermap.org/api.

You can run this script with the command python app.pywhere app.py is the name of your script file. This will create a new file called windspeed.txt in the current directory with one value. It’s the windspeed at Boston, MA, at the time you reach the API. If you rerun the script, it’ll append another value to the same file.

Related: How to Make a PDF Text-to-Speech Reader in Python
Related: How to Run Python Tests on Every Commit Using GitHub Actions

Your first Prefect ETL workflow.

The above script works well. Yet, it lacks some critical features of a complete ETL, such as retrying and scheduling. Also, as mentioned earlier, a real-life ETL may have hundreds of tasks in a single workflow. Some of them can be run in parallel, whereas some depend on one or more other tasks.

Imagine if there is a temporary network issue that prevents you from calling the API. The script would fail immediately with no further attempt. In live applications, such downtimes aren’t a miracle. They happen for several reasons—server downtime, network downtime, and server query limit exceeds.

Also, you have to manually execute the above script every time to update your windspeed.txt file. Yet, scheduling the workflow to run at a specific time in a predefined interval is common in ETL workflows.

This is where tools such as Prefect and Airflow come to the rescue. Here’s how you could tweak the above Python code to make it a Prefect workflow.

import requests

# Importing Prefect task, Flow and Python timdelta
from prefect import task, Flow
from datetime import timedelta

# decorater specifying how many times to retry and it's iterval.
@task(max_retries=3, retry_delay=timedelta(minutes=3))
def extract() -> dict:
    ...


@task
def transform(response: dict) -> float:
    ...

@task(max_retries=3, retry_delay=timedelta(5))
def load(speed: float):
    ...


# Create a Prefect flow
with Flow("Windspeed Tracker") as flow:
    # Execute tasks in the right order.
    response = extract()
    windspeed = transform(response)
    load(windspeed)

if __name__ == "__main__":
    # Execute the flow
    flow.run()
Python

The @task decorator converts a regular Python function into a Prefect task. The optional arguments allow you to specify its retry behavior. We’ve configured the function to attempt three times before it fails in the above example. We’ve also configured it to delay each retry by three minutes.

With this new setup, our ETL is resilient to the network issues we discussed earlier.

To test its functioning, disconnect your computer from the network and run the script with python app.py. You’ll see a message that the first attempt failed, and the next one will begin in the next 3 minutes. Within three minutes, connect your computer back to the internet. The already-running script will now finish without any errors.

Scheduling workflows with Prefect.

Retrying is only part of the ETL story. Another challenge for many workflow applications is to run them in scheduled intervals. Prefect’s scheduling API is straightforward for any Python programmer. Here’s how it works.

...
# Imports to facilitate Scheduling.
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule

...

# Create a schedule object
schedule = IntervalSchedule(
    start_date=datetime.utcnow() + timedelta(seconds=5),
    interval=timedelta(minutes=1),
)

# Attach the schedule object to the windspeed trakcer flow.
with Flow("Windspeed Tracker", schedule=schedule) as flow:
    response = extract()
    windspeed = transform(response)
    load(windspeed)
Python

We’ve created an IntervalSchedule object that starts five seconds from the execution of the script. We’ve also configured it to run in a one-minute interval.

If you run the script with python app.py and monitor the windspeed.txt file, you will see new values in it every minute.

In addition to this simple scheduling, Prefect’s schedule API offers more control over it. You can schedule workflows in a cron-like method, use clock time with timezones, or do more fun stuff like executing workflow only on weekends. I haven’t covered them all here, but Prefect’s official docs about this are perfect.

The Prefect UI.

Like Airflow (and many others,) Prefect too ships with a server with a beautiful UI. It allows you to control and visualize your workflow executions.

The prefect UI

 You need to have docker and docker-compose installed on your computer to run this. But starting it is surprisingly a single command.

prefect server start
Bash

Starting the Prefect Server

This command will start the prefect server, and you can access it through your web browser: <http://localhost:8080/>.

However, the Prefect server alone could not execute your workflows. Its role is only enabling a control panel for all your Prefect activities. Because this dashboard is decoupled from the rest of the application, you can use Prefect Cloud to do the same. We’ll discuss this in detail later.

Related: How I Create Dazzling Dashboards Purely in Python.

To execute tasks, we need a few more things. The good news is they, too, aren’t complicated.

Because servers are merely control panels, we need an agent to execute the workflow. The below command will start a local agent. Instead of a local agent; you can choose a docker agent or a Kubernetes one if your project needs them.

prefect agent local start
Bash

Starting Prefect local agent

Once the server and the agent are running, you’ll have to create a project and register your workflow with that project. To do this, change the line that executes the flow to the following.

if __name__ == "__main__":
    flow.register('Tutorial')
Python

Now in the terminal, you can create a project with the prefect create project <project name> command. Then rerunning the script will register it to the project instead of running it immediately.

prefect create project 'Tutorial'
python app.py
Bash
Running the Prefect project to register it with the server.

In the web UI, you can see the new Project ‘Tutorial’ is in the dropdown, and our windspeed tracker is in the list of flows. The flow is already scheduled and running. If you prefer, you can run them manually as well.

The registered project on Prefect server

 

Running workflows with parameters.

The workflow we created in the previous exercise is rigid. It queries only for Boston, MA, and we can not change it. This is where we can use parameters. Here’s how we tweak our Python code to accept a parameter at the run time.

# Import parameters
from prefect import task, Flow, Parameter

# Tweak the function to accept city argument.
@task(max_retries=3, retry_delay=timedelta(minutes=3))
def extract(city: str) -> dict:
    url = "https://api.openweathermap.org/data/2.5/weather"

    # TODO: Use a real API key. You can get a free one from https://openweathermap.org/
    response = requests.request(
        "GET", url, params={"q": city, "appid": "e5ecbcc49e3debeead24d0fe012fb46e"}
    )

    return response.json()

...

with Flow("Windspeed Tracker", schedule=schedule) as flow:
    # create a city parameter with the default value Boston and pass it to the extract task.
    city = Parameter("city", default="Boston")
    response = extract(city)
    ...

...
Python

We’ve changed the function to accept the city argument and set it dynamically in the API query. Inside the Flow, we create a parameter object with the default value ‘Boston’ and pass it to the Extract task.

If you run the windspeed tracker workflow manually in the UI, you’ll see a section called input. Here you can set the value of the city for every execution.

Running the project from Prefect server

This is a convenient way to run workflows. In many cases, ETLs and any other workflow come with run-time parameters.

Why did I decide to migrate from Airflow to Prefect?

Airflow is a fantastic platform for workflow management. It saved me a ton of time on many projects. Yet, we need to appreciate new technologies taking over the old ones. That’s the case with Airflow and Prefect.

Airflow got many things right, but its core assumptions never anticipated the rich variety of data applications that have emerged.

— Prefect Documentation.

What I describe here aren’t dead-ends if you prefer Airflow. We have workarounds for most problems. Yet, it’s convenient in Prefect because the tool natively supports them.

Prefect’s installation is exceptionally straightforward compared to Airflow. For trained eyes, it may not be a problem. Yet, for whoever wants to start on workflow orchestration and automation, it’s a hassle.

Airflow needs a server running in the backend to perform any task. Yet, in Prefect, a server is optional. This is a massive benefit of using Prefect. I have many pet projects running on my computer as services. Earlier, I had to have an Airflow server commencing at the startup. Because Prefect could run standalone, I don’t have to turn on this additional server anymore.

Airflow doesn’t have the flexibility to run workflows (or DAGs) with parameters. The workaround I used to have is to let the application read them from a database. This isn’t an excellent programming technique for such a simple task. Prefect’s parameter concept is exceptional on this front.

Prefect allows having different versions of the same workflow. Every time you register a workflow to the project, it creates a new version. If you need to run a previous version, you can easily select it in a dropdown. This isn’t possible with Airflow.

Prefect also allows us to create teams and role-based access controls. Each team could manage its configuration. Authorization is a critical part of every modern application, and Prefect handles it in the best way possible.

Lastly, I find Prefect’s UI more intuitive and appealing. Airflow’s UI, especially its task execution visualization, was difficult at first to understand.

Prefect’s ecosystem and integration with other technologies.

Prefect has inbuilt integration with many other technologies. It eliminates a ton of overhead and makes working with them super easy.

Live projects often have to deal with several technologies. For example, when your ETL fails, you may want to send an email or a Slack notification to the maintainer.

In Prefect, sending such notifications is effortless. You can use the EmailTask from the Prefect’s task library, set the credentials, and start sending emails.

You can learn more about Prefect’s rich ecosystem in their official documentation. In this article, we’ll see how to send email notifications.

To send emails, we need to make the credentials accessible to the Prefect agent. You can do that by creating the below file in $HOME/.prefect/config.toml.

[context.secrets]
EMAIL_USERNAME = "<your email id>"
EMAIL_PASSWORD = "<your email password>"
TOML

Your app is now ready to send emails. Here’s how we send a notification when we successfully capture a windspeed measure.

# Import email task
from prefect.tasks.notifications.email_task import EmailTask

...

# Create an email_task object. Use all static content here.
email_task = EmailTask(
    subject="A new windspeed captured",
    email_to="[email protected]",
    email_from="[email protected]",
)

with Flow("Windspeed Tracker", schedule=schedule) as flow:
    ...

    # Call the email task with variable content.
    email_task(
        msg=str(windspeed),
    )
Python

In the above code, we’ve created an instance of the EmailTask class. We’ve used all the static elements of our email configurations during initiating. Then inside the Flow, we’ve used it with passing variable content.

This configuration above will send an email with the captured windspeed measurement. But its subject will always remain ‘A new windspeed captured.’

Prefect Cloud vs. On-Premis Server Deployments.

We’ve already looked into how we can start an on-premise server. Because this server is only a control panel, you could easily use the cloud version instead. To do this, we have a few additional steps to follow.

  1. Create a Prefect cloud account.
  2. Generate a key from the API Key Page.
  3. In your terminal, set the backend to the cloud: prefect backend cloud.
  4. Also, log in with the generated key: prefect auth login --key YOUR_API_KEY.
  5. Now, start the agent as usual. prefect agent local start.

In the cloud dashboard, you can manage everything you did on the local server before.

A big question when choosing between cloud and server versions is security. According to Prefect’s docs, the server only stores workflow execution-related data and voluntary information provided by the user. Since the agent in your local computer executes the logic, you can control where you store your data.

The cloud option is suitable for performance reasons too. With one cloud server, you can manage more than one agent. Thus, you can scale your app effortlessly.

Final Thoughts

Airflow was my ultimate choice for building ETLs and other workflow management applications. Yet, Prefect changed my mind, and now I’m migrating everything from Airflow to Prefect.

Prefect is a straightforward tool that is flexible to extend beyond what Airflow can do. You can run it even inside a Jupyter Notebook. Also, you can host it as a complete task management solution.

In addition to the central problem of workflow management, Prefect solves several other issues you may frequently encounter in a live system. Managing teams with authorization controls and sending notifications are some of them.

In this article, we’ve discussed how to create an ETL that

  • retries some tasks as configured;
  • run workflows in a schedule;
  • accepts run-time parameters and;
  • sends an email notification when it’s done.

We’ve only scratched the surface of Prefects capabilities. I recommend reading the official documentation for more information.


Thanks for the read, friend. It seems you and I have lots of common interests. Say Hi to me on LinkedIn, Twitter, and Medium.

Not a Medium member yet? Please use this link to become a member because I earn a commission for referring at no extra cost for you.

Similar Posts