This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
from enum import Enum
from random import random
from typing import NamedTuple, Optional
import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.errors import DagsterInvalidDefinitionError
[docs]class Backoff(Enum):
"""A modifier for delay as a function of attempt number.
LINEAR: `attempt_num * delay`
EXPONENTIAL: `((2 ^ attempt_num) - 1) * delay`
"""
LINEAR = "LINEAR"
EXPONENTIAL = "EXPONENTIAL"
[docs]class Jitter(Enum):
"""A randomizing modifier for delay, applied after backoff calculation.
FULL: between 0 and the calculated delay based on backoff: `random() * backoff_delay`
PLUS_MINUS: +/- the delay: `backoff_delay + ((2 * (random() * delay)) - delay)`
"""
FULL = "FULL"
PLUS_MINUS = "PLUS_MINUS"
[docs]class RetryPolicy(
NamedTuple(
"_RetryPolicy",
[
("max_retries", PublicAttr[int]),
("delay", PublicAttr[Optional[check.Numeric]]),
# declarative time modulation to allow calc witout running user function
("backoff", PublicAttr[Optional[Backoff]]),
("jitter", PublicAttr[Optional[Jitter]]),
],
),
):
"""A declarative policy for when to request retries when an exception occurs during op execution.
Args:
max_retries (int):
The maximum number of retries to attempt. Defaults to 1.
delay (Optional[Union[int,float]]):
The time in seconds to wait between the retry being requested and the next attempt
being started. This unit of time can be modulated as a function of attempt number
with backoff and randomly with jitter.
backoff (Optional[Backoff]):
A modifier for delay as a function of retry attempt number.
jitter (Optional[Jitter]):
A randomizing modifier for delay, applied after backoff calculation.
"""
def __new__(
cls,
max_retries: int = 1,
delay: Optional[check.Numeric] = None,
backoff: Optional[Backoff] = None,
jitter: Optional[Jitter] = None,
):
if backoff is not None and delay is None:
raise DagsterInvalidDefinitionError(
"Can not set jitter on RetryPolicy without also setting delay"
)
if jitter is not None and delay is None:
raise DagsterInvalidDefinitionError(
"Can not set backoff on RetryPolicy without also setting delay"
)
return super().__new__(
cls,
max_retries=check.int_param(max_retries, "max_retries"),
delay=check.opt_numeric_param(delay, "delay"),
backoff=check.opt_inst_param(backoff, "backoff", Backoff),
jitter=check.opt_inst_param(jitter, "jitter", Jitter),
)
def calculate_delay(self, attempt_num: int) -> check.Numeric:
return calculate_delay(
attempt_num=attempt_num,
backoff=self.backoff,
jitter=self.jitter,
base_delay=self.delay or 0,
)
def calculate_delay(
attempt_num: int, backoff: Optional[Backoff], jitter: Optional[Jitter], base_delay: float
) -> float:
if backoff is Backoff.EXPONENTIAL:
calc_delay = ((2**attempt_num) - 1) * base_delay
elif backoff is Backoff.LINEAR:
calc_delay = base_delay * attempt_num
elif backoff is None:
calc_delay = base_delay
else:
check.assert_never(backoff)
if jitter is Jitter.FULL:
calc_delay = random() * calc_delay
elif jitter is Jitter.PLUS_MINUS:
calc_delay = calc_delay + ((2 * (random() * base_delay)) - base_delay)
elif jitter is None:
pass
else:
check.assert_never(jitter)
return calc_delay