Connection Examples#
Connecting to a default Redis instance, running locally.#
[2]:
import redis
connection = redis.Redis()
connection.ping()
[2]:
True
By default Redis return binary responses, to decode them use decode_responses=True#
[3]:
import redis
decoded_connection = redis.Redis(decode_responses=True)
decoded_connection.ping()
[3]:
True
By default this library uses the RESP 2 protocol. To enable RESP3, set protocol=3.#
import redis
r = redis.Redis(protocol=3) rcon.ping()
Connecting to a redis instance, specifying a host and port with credentials.#
[4]:
import redis
user_connection = redis.Redis(host='localhost', port=6380, username='dvora', password='redis', decode_responses=True)
user_connection.ping()
[4]:
True
Connecting to a redis instance with username and password credential provider#
[ ]:
import redis
creds_provider = redis.UsernamePasswordCredentialProvider("username", "password")
user_connection = redis.Redis(host="localhost", port=6379, credential_provider=creds_provider)
user_connection.ping()
Connecting to a redis instance with standard credential provider#
[ ]:
from typing import Tuple
import redis
creds_map = {"user_1": "pass_1",
"user_2": "pass_2"}
class UserMapCredentialProvider(redis.CredentialProvider):
def __init__(self, username: str):
self.username = username
def get_credentials(self) -> Tuple[str, str]:
return self.username, creds_map.get(self.username)
# Create a default connection to set the ACL user
default_connection = redis.Redis(host="localhost", port=6379)
default_connection.acl_setuser(
"user_1",
enabled=True,
passwords=["+" + "pass_1"],
keys="~*",
commands=["+ping", "+command", "+info", "+select", "+flushdb"],
)
# Create a UserMapCredentialProvider instance for user_1
creds_provider = UserMapCredentialProvider("user_1")
# Initiate user connection with the credential provider
user_connection = redis.Redis(host="localhost", port=6379,
credential_provider=creds_provider)
user_connection.ping()
Connecting to a redis instance first with an initial credential set and then calling the credential provider#
[ ]:
from typing import Union
import redis
class InitCredsSetCredentialProvider(redis.CredentialProvider):
def __init__(self, username, password):
self.username = username
self.password = password
self.call_supplier = False
def call_external_supplier(self) -> Union[Tuple[str], Tuple[str, str]]:
# Call to an external credential supplier
raise NotImplementedError
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
if self.call_supplier:
return self.call_external_supplier()
# Use the init set only for the first time
self.call_supplier = True
return self.username, self.password
cred_provider = InitCredsSetCredentialProvider(username="init_user", password="init_pass")
Connecting to a redis instance with AWS Secrets Manager credential provider.#
[ ]:
import redis
import boto3
import json
import cachetools.func
class SecretsManagerProvider(redis.CredentialProvider):
def __init__(self, secret_id, version_id=None, version_stage='AWSCURRENT'):
self.sm_client = boto3.client('secretsmanager')
self.secret_id = secret_id
self.version_id = version_id
self.version_stage = version_stage
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
@cachetools.func.ttl_cache(maxsize=128, ttl=24 * 60 * 60) #24h
def get_sm_user_credentials(secret_id, version_id, version_stage):
secret = self.sm_client.get_secret_value(secret_id, version_id)
return json.loads(secret['SecretString'])
creds = get_sm_user_credentials(self.secret_id, self.version_id, self.version_stage)
return creds['username'], creds['password']
my_secret_id = "EXAMPLE1-90ab-cdef-fedc-ba987SECRET1"
creds_provider = SecretsManagerProvider(secret_id=my_secret_id)
user_connection = redis.Redis(host="localhost", port=6379, credential_provider=creds_provider)
user_connection.ping()
Connecting to a redis instance with ElastiCache IAM credential provider.#
[4]:
from typing import Tuple, Union
from urllib.parse import ParseResult, urlencode, urlunparse
import botocore.session
import redis
from botocore.model import ServiceId
from botocore.signers import RequestSigner
from cachetools import TTLCache, cached
class ElastiCacheIAMProvider(redis.CredentialProvider):
def __init__(self, user, cluster_name, region="us-east-1"):
self.user = user
self.cluster_name = cluster_name
self.region = region
session = botocore.session.get_session()
self.request_signer = RequestSigner(
ServiceId("elasticache"),
self.region,
"elasticache",
"v4",
session.get_credentials(),
session.get_component("event_emitter"),
)
# Generated IAM tokens are valid for 15 minutes
@cached(cache=TTLCache(maxsize=128, ttl=900))
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
query_params = {"Action": "connect", "User": self.user}
url = urlunparse(
ParseResult(
scheme="https",
netloc=self.cluster_name,
path="/",
query=urlencode(query_params),
params="",
fragment="",
)
)
signed_url = self.request_signer.generate_presigned_url(
{"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
operation_name="connect",
expires_in=900,
region_name=self.region,
)
# RequestSigner only seems to work if the URL has a protocol, but
# Elasticache only accepts the URL without a protocol
# So strip it off the signed URL before returning
return (self.user, signed_url.removeprefix("https://"))
username = "barshaul"
cluster_name = "test-001"
endpoint = "test-001.use1.cache.amazonaws.com"
creds_provider = ElastiCacheIAMProvider(user=username, cluster_name=cluster_name)
user_connection = redis.Redis(host=endpoint, port=6379, credential_provider=creds_provider)
user_connection.ping()
[4]:
True
Connecting to Redis instances by specifying a URL scheme.#
Parameters are passed to the following schems, as parameters to the url scheme.
Three URL schemes are supported:
redis://
creates a TCP socket connection. https://www.iana.org/assignments/uri-schemes/prov/redisrediss://
creates a SSL wrapped TCP socket connection. https://www.iana.org/assignments/uri-schemes/prov/redissunix://
: creates a Unix Domain Socket connection.
[7]:
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True&health_check_interval=2")
url_connection.ping()
[7]:
True
Connecting to Redis instances by specifying a URL scheme and the RESP3 protocol.#
[ ]:
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True&health_check_interval=2&protocol=3")
url_connection.ping()
Connecting to a Sentinel instance#
[ ]:
from redis.sentinel import Sentinel
sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
sentinel.discover_master("redis-py-test")