Pipeline examples#

This example show quickly how to use pipelines in redis-py.

Checking that Redis is running#

[1]:
import redis

r = redis.Redis(decode_responses=True)
r.ping()
[1]:
True

Simple example#

Creating a pipeline instance#

[2]:
pipe = r.pipeline()

Adding commands to the pipeline#

[3]:
pipe.set("a", "a value")
pipe.set("b", "b value")

pipe.get("a")
[3]:
Pipeline<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

Executing the pipeline#

[4]:
pipe.execute()
[4]:
[True, True, 'a value']

The responses of the three commands are stored in a list. In the above example, the two first boolean indicates that the set commands were successful and the last element of the list is the result of the get("a") comand.

Chained call#

The same result as above can be obtained in one line of code by chaining the opperations.

[5]:
pipe = r.pipeline()
pipe.set("a", "a value").set("b", "b value").get("a").execute()
[5]:
[True, True, 'a value']

Here’s a slightly more advanced example for chaining complex operations using the builder pattern.

[ ]:
from dataclasses import dataclass
from typing import Optional

@dataclass
class User:
    email: str
    username: Optional[str] = None

@dataclass
class Post:
    title: str
    body: str
    author: Optional[str] = None

class RedisRepository:
    def __init__(self):
        self.pipeline = r.pipeline()

    def add_user(self, user: User):
        if not user.username:
            user.username = user.email.split("@")[0]
        self.pipeline.hset(f"user:{user.username}", mapping={"username": user.username, "email": user.email})
        return self

    def add_post(self, post: Post):
        self.pipeline.hset(f"post:#{post.title}#", mapping={"title": post.title, "body": post.body, "author": post.author})
        if post.author:
            self.pipeline.sadd(f"user:{post.author}:posts", f"post:#{post.title}#")
        return self

    def add_follow(self, follower: str, following: str):
        self.pipeline.sadd(f"user:{follower}:following", following)
        self.pipeline.sadd(f"user:{following}:followers", follower)
        return self

    def execute(self):
        return self.pipeline.execute()

pipe = RedisRepository()
results = (pipe
    .add_user(User(email="alice@example.com"))
    .add_user(User(email="bob@example.com"))
    .add_follow("alice", "bob")
    .add_post(Post(title="Hello World", body="I'm using Redis!", author="alice"))
    .execute()
)

Performance comparison#

Using pipelines can improve performance, for more informations, see Redis documentation about pipelining. Here is a simple comparison test of performance between basic and pipelined commands (we simply increment a value and measure the time taken by both method).

[6]:
from datetime import datetime

incr_value = 100000

Without pipeline#

[7]:
r.set("incr_key", "0")

start = datetime.now()

for _ in range(incr_value):
    r.incr("incr_key")
res_without_pipeline = r.get("incr_key")

time_without_pipeline = (datetime.now() - start).total_seconds()
[8]:
print("Without pipeline")
print("================")
print("Time taken: ", time_without_pipeline)
print("Increment value: ", res_without_pipeline)
Without pipeline
================
Time taken:  21.759733
Increment value:  100000

With pipeline#

[9]:
r.set("incr_key", "0")

start = datetime.now()

pipe = r.pipeline()
for _ in range(incr_value):
    pipe.incr("incr_key")
pipe.get("incr_key")
res_with_pipeline = pipe.execute()[-1]

time_with_pipeline = (datetime.now() - start).total_seconds()
[10]:
print("With pipeline")
print("=============")
print("Time taken: ", time_with_pipeline)
print("Increment value: ", res_with_pipeline)
With pipeline
=============
Time taken:  2.357863
Increment value:  100000

Using pipelines provides the same result in much less time.