How to Speed up Python Data Pipelines up to 91X?

July 18, 2021

How to Speed up Python Data Pipelines up to 91X?

The frustrating thing about being a data scientist is waiting for big-data pipelines to finish.

Although python is the romantic language of data scientists, it isn't the fastest. This scripting language is interpreted at the time of execution, making it slow and parallel executions hard. Sadly, not every data scientist is an expert in C++.

What if there is a way to run python codes with parallel execution and at the speed of compiled codes? That's the problem Tuplex is solving.

Tuplex is a parallel big-data processing framework written in Python. If you've worked in Apache Spark, this may look familiar to you. Yet, unlike spark, Tuplex doesn't invoke the Python interpreter. It optimizes and converts the pipeline into LLVM byte codes to run at blazing speed, as fast as a hand-optimized C++ code.

Python uses the multiprocessing library to parallelize execution. The drawback of this library is it doesn't work in any REPL environment. Yet, we data scientists love the Jupyter notebook. Under the hood, multiprocessing isn't even a parallel execution technique. It only starts multiple subprocesses, and the OS takes care of its parallel execution. Indeed there is no guarantee that the OS will run them in parallel.

In this article, we'll discuss:

  • How to install Tuplex;
  • How to run a trivial data pipeline;
  • The handy exception handling in Tuplex;
  • How advanced configurations could help you, and;
  • Benchmarking it against usual python code.

I'm sure this will be a walk in the park.

Related: Pandas Replace: The Faster and Better Approach to Change Values of a Column.

Get up and running with Tuplex.

Despite its usefulness, Tuplex's setup is incredibly straightforward. PyPI does it.

pip install tuplex

While this method is recommended on Linux, you may have to use a docker container on Mac.

A little caveat here is that it hasn't been tested on Windows PCs. At least Tuplex's documentation doesn't mention it. Please share if you have had luck with Windows PCs.


Your first data pipeline with Tuplex.

Once you've installed Tuplex, running a parallel task is easy. Here is the example from the Tuplex official documentation page.

from tuplex import *
c = Context()
# access elements via tuple syntax
# will print [11, 22, 33]

c.parallelize([(1, 10), (2, 20), (3, 30)]) \
 .map(lambda x: x[0] + x[1]) \
 .collect()

First, you have to create a Tuplex context. You can do this by importing it from the Tuplex module.

From here, there are only three steps you need to run parallel function executions; parallelize, map, and collect.

The parallelize method of the Tuplex context object is your starting point. It takes a list of input values to your function as arguments. Each element of this list will run through the function parallel to others.

You can pass a user-defined function that transforms each input using the map function. Finally, collect the outputs of all the parallel executions using the collect method.


Convenient exception handling in Tuplex.

My favorite part about Tuplex is its convenience in managing exceptions. Error handling in data pipelines is a daunting experience. Imagine working for hours to process a data stream and discover a subtle division by zero error that killed everything you've done.

from tuplex import *
c = Context()

c.parallelize([(1, 0), (2, 1), (3, 0), (4, -1)]) \
 .map(lambda x, y: x / y) \
 .collect()

The above code will raise a division-by-zero error. At least that's the case if you are using spark or any standard python module to process this.

In Tuplex, error handling is an automatic thing. It'll ignore the one with the error and return the rest. The above code will return [2, -4] as the first and third inputs from the list can not be executed.

But, ignoring errors is sometimes problematic. Often you have to deal with them differently, and Tuplex's API is flexible enough to do it. In fact, the Tuplex way is handy.

from tuplex import *
c = Context()

c.parallelize([(1, 0), (2, 1), (3, 0), (4, -1)]) \
 .map(lambda x, y: x / y) \
 .resolve(ZeroDivisionError, lambda a, b: 0) \
 .collect()

Tuplex made error handling effortless. You have to chain a 'resolve' method between the 'map' and 'collect' methods. On the example above, we've passed in the ZeroDivisionError type and handled it by replacing zeros.

The second argument to the resolve method is a function. With this function, you can tell Tuplex what to do when the type of error occurs.


Configuring Tuplex for advanced use cases.

You can configure Tuplex in two ways. The first is a straightforward solution; just pass a dictionary to the Context initiation. Here is an example that sets the execution memory to a higher value.

from tuplex import *
c = Context(executorMemory="2G")

Tuplex also supports passing configurations in YAML files. In a production environment, you may have to store configurations in files. A YAML file is an excellent way to work with different configs and passing it between dev and testing teams.

from tuplex import *
c = Context(conf="/conf/tuplex.yaml")

Here is an example config file with all the different customization you can make from Tuplex documentation.

# FastETL configuration file
#   created 2019-02-17 16:45:09.940033 UTC
tuplex:
    -   allowUndefinedBehavior: false
    -   autoUpcast: false
    -   csv:
            -   comments: ["#", "~"]
            -   generateParser: true
            -   maxDetectionMemory: 256KB
            -   maxDetectionRows: 100
            -   quotechar: "\""
            -   selectionPushdown: true
            -   separators: [",", ;, "|", "\t"]
    -   driverMemory: 1GB
    -   executorCount: 4
    -   executorMemory: 1GB
    -   logDir: .
    -   normalcaseThreshold: 0.9
    -   partitionSize: 1MB
    -   runTimeLibrary: tuplex_runtime
    -   runTimeMemory: 32MB
    -   runTimeMemoryBlockSize: 4MB
    -   scratchDir: /tmp
    -   useLLVMOptimizer: true

Performance benchmark

Tuplex's promises are intriguing. It's time to see its performance boost in action.

I'm using this simple prime number counter function for this benchmarking. I ran this function first using for loops, then using python's inbuilt multiprocessing module, and finally with Tuplex.

def count_primes(max_num):
    """This function counts of prime numbers below the input value.
    Input values are in thousands, ie. 40, is 40,000.
    """
    count = 0
    for num in range(max_num * 1000 + 1):
        if num > 1:
            for i in range(2, num):
                if num % i == 0:
                    break
                else:
                    count += 1
    return count

Executing intensive tasks in standard Python.

Running the function in a for loop is the simplest. I use the '%%time' helper from the Jupyter notebook to track the execution time.

%%time
for val in [10, 20, 30, 40]:
    print(count_primes(val))

Running the above code multiple times resulted in an average of 51.2 seconds to complete.

Slow execution is expected in for loop execution. But let's try the same with python's inbuilt multiprocessing module. The below code does not run on REPL's like Jupyter notebook. You have to put it in a .py file and execute it in the command line.

from multiprocessing import Pool
from datetime import datetime

def count_primes(max_num):
    """This function counts of prime numbers below the input value.
    Input values are in thousands, ie. 40, is 40,000.
    """
    count = 0
    for num in range(max_num * 1000 + 1):
        if num > 1:
            for i in range(2, num):
                if num % i == 0:
                    break
                else:
                    count += 1
    return count

if __name__ == "__main__":
    start_time = datetime.now()
    with Pool(5) as p:
        print(p.map(count_primes, [10, 20, 30, 40]))
    end_time = datetime.now()

    print(f"It took {end_time - start_time} to run")

Running this multiprocessing script resulted in an average of 30.76 seconds. A reduction of 20.44 seconds compared to the for-loop way.

Tuplex handling intensive tasks in parallel.

At last, we execute the same prime number counter function, this time with Tuplex. This concise code below took 0.000040 seconds on average and produced the same results.

from tuplex import *

c = Context()

c.parallelize([10, 20, 30, 40]).map(count_primes).collect()

Tuplex's performance boost is remarkable compared to the other standard python ways. This trivial example resulted in 769k times shorter execution than multiprocessing and 1280k times faster than vanilla for-loops.

We'll... let's stick to the Tuplex team's promise of 5--91X. Nevertheless, Tuplex urges me to think twice before I write another for loop ever.

Conclusion

Tuplex is an easy-to-setup python package that could save you a ton of time. It speeds up data pipelines by converting them into byte codes and executing them in parallel.

The performance benchmark shows that its improvement to the code execution is profound. Yet, its setup is straightforward, and its syntax and configurations are incredibly flexible.

The coolest part of Tuplex is its convenient exception handling. Error handling in data pipelines hasn't been this easier. It integrates well with interactive shells and Jupiter notebooks. This is usually not the case for compiled languages. Even python itself is not capable of handling parallel processing inside a REPL like Jupyter notebook.

Tuplex has remarkable success in boosting Python's performance. But how does it perform compared to Python's traditional approach to high-performance computing? Here is an article that benchmarks it against Cython.

Related: Challenging Cython—the Python Module for High-Performance Computing.

How we work

Readers support The Analytics Club. We earn through display ads. Also, when you buy something we recommend, we may get an affiliate commission. But it never affects your price or what we pick.

Connect with us