Connection Examples#

Connecting to a default Redis instance, running locally.#

[2]:
import redis

connection = redis.Redis()
connection.ping()
[2]:
True

By default Redis return binary responses, to decode them use decode_responses=True#

[3]:
import redis

decoded_connection = redis.Redis(decode_responses=True)
decoded_connection.ping()
[3]:
True

By default this library uses the RESP 2 protocol. To enable RESP3, set protocol=3.#

import redis

r = redis.Redis(protocol=3) rcon.ping()

Connecting to a redis instance, specifying a host and port with credentials.#

[4]:
import redis

user_connection = redis.Redis(host='localhost', port=6380, username='dvora', password='redis', decode_responses=True)
user_connection.ping()
[4]:
True

Connecting to a redis instance with username and password credential provider#

[ ]:
import redis

creds_provider = redis.UsernamePasswordCredentialProvider("username", "password")
user_connection = redis.Redis(host="localhost", port=6379, credential_provider=creds_provider)
user_connection.ping()

Connecting to a redis instance with standard credential provider#

[ ]:
from typing import Tuple
import redis

creds_map = {"user_1": "pass_1",
             "user_2": "pass_2"}

class UserMapCredentialProvider(redis.CredentialProvider):
    def __init__(self, username: str):
        self.username = username

    def get_credentials(self) -> Tuple[str, str]:
        return self.username, creds_map.get(self.username)

# Create a default connection to set the ACL user
default_connection = redis.Redis(host="localhost", port=6379)
default_connection.acl_setuser(
    "user_1",
    enabled=True,
    passwords=["+" + "pass_1"],
    keys="~*",
    commands=["+ping", "+command", "+info", "+select", "+flushdb"],
)

# Create a UserMapCredentialProvider instance for user_1
creds_provider = UserMapCredentialProvider("user_1")
# Initiate user connection with the credential provider
user_connection = redis.Redis(host="localhost", port=6379,
                              credential_provider=creds_provider)
user_connection.ping()

Connecting to a redis instance first with an initial credential set and then calling the credential provider#

[ ]:
from typing import Union
import redis

class InitCredsSetCredentialProvider(redis.CredentialProvider):
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.call_supplier = False

    def call_external_supplier(self) -> Union[Tuple[str], Tuple[str, str]]:
        # Call to an external credential supplier
        raise NotImplementedError

    def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
        if self.call_supplier:
            return self.call_external_supplier()
        # Use the init set only for the first time
        self.call_supplier = True
        return self.username, self.password

cred_provider = InitCredsSetCredentialProvider(username="init_user", password="init_pass")

Connecting to a redis instance with AWS Secrets Manager credential provider.#

[ ]:
import redis
import boto3
import json
import cachetools.func

class SecretsManagerProvider(redis.CredentialProvider):
    def __init__(self, secret_id, version_id=None, version_stage='AWSCURRENT'):
        self.sm_client = boto3.client('secretsmanager')
        self.secret_id = secret_id
        self.version_id = version_id
        self.version_stage = version_stage

    def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
        @cachetools.func.ttl_cache(maxsize=128, ttl=24 * 60 * 60) #24h
        def get_sm_user_credentials(secret_id, version_id, version_stage):
            secret = self.sm_client.get_secret_value(secret_id, version_id)
            return json.loads(secret['SecretString'])
        creds = get_sm_user_credentials(self.secret_id, self.version_id, self.version_stage)
        return creds['username'], creds['password']

my_secret_id = "EXAMPLE1-90ab-cdef-fedc-ba987SECRET1"
creds_provider = SecretsManagerProvider(secret_id=my_secret_id)
user_connection = redis.Redis(host="localhost", port=6379, credential_provider=creds_provider)
user_connection.ping()

Connecting to a redis instance with ElastiCache IAM credential provider.#

[4]:
from typing import Tuple, Union
from urllib.parse import ParseResult, urlencode, urlunparse

import botocore.session
import redis
from botocore.model import ServiceId
from botocore.signers import RequestSigner
from cachetools import TTLCache, cached

class ElastiCacheIAMProvider(redis.CredentialProvider):
    def __init__(self, user, cluster_name, region="us-east-1"):
        self.user = user
        self.cluster_name = cluster_name
        self.region = region

        session = botocore.session.get_session()
        self.request_signer = RequestSigner(
            ServiceId("elasticache"),
            self.region,
            "elasticache",
            "v4",
            session.get_credentials(),
            session.get_component("event_emitter"),
        )

    # Generated IAM tokens are valid for 15 minutes
    @cached(cache=TTLCache(maxsize=128, ttl=900))
    def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
        query_params = {"Action": "connect", "User": self.user}
        url = urlunparse(
            ParseResult(
                scheme="https",
                netloc=self.cluster_name,
                path="/",
                query=urlencode(query_params),
                params="",
                fragment="",
            )
        )
        signed_url = self.request_signer.generate_presigned_url(
            {"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
            operation_name="connect",
            expires_in=900,
            region_name=self.region,
        )
        # RequestSigner only seems to work if the URL has a protocol, but
        # Elasticache only accepts the URL without a protocol
        # So strip it off the signed URL before returning
        return (self.user, signed_url.removeprefix("https://"))

username = "barshaul"
cluster_name = "test-001"
endpoint = "test-001.use1.cache.amazonaws.com"
creds_provider = ElastiCacheIAMProvider(user=username, cluster_name=cluster_name)
user_connection = redis.Redis(host=endpoint, port=6379, credential_provider=creds_provider)
user_connection.ping()
[4]:
True

Connecting to Redis instances by specifying a URL scheme.#

Parameters are passed to the following schems, as parameters to the url scheme.

Three URL schemes are supported:

[7]:
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True&health_check_interval=2")
url_connection.ping()
[7]:
True

Connecting to Redis instances by specifying a URL scheme and the RESP3 protocol.#

[ ]:
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True&health_check_interval=2&protocol=3")
url_connection.ping()

Connecting to a Sentinel instance#

[ ]:
from redis.sentinel import Sentinel
sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
sentinel.discover_master("redis-py-test")