You are viewing an outdated version of the documentation.

This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.

Source code for dagster._core.definitions.policy

from enum import Enum
from random import random
from typing import NamedTuple, Optional

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._core.errors import DagsterInvalidDefinitionError


[docs]class Backoff(Enum): """A modifier for delay as a function of attempt number. LINEAR: `attempt_num * delay` EXPONENTIAL: `((2 ^ attempt_num) - 1) * delay` """ LINEAR = "LINEAR" EXPONENTIAL = "EXPONENTIAL"
[docs]class Jitter(Enum): """A randomizing modifier for delay, applied after backoff calculation. FULL: between 0 and the calculated delay based on backoff: `random() * backoff_delay` PLUS_MINUS: +/- the delay: `backoff_delay + ((2 * (random() * delay)) - delay)` """ FULL = "FULL" PLUS_MINUS = "PLUS_MINUS"
[docs]class RetryPolicy( NamedTuple( "_RetryPolicy", [ ("max_retries", PublicAttr[int]), ("delay", PublicAttr[Optional[check.Numeric]]), # declarative time modulation to allow calc witout running user function ("backoff", PublicAttr[Optional[Backoff]]), ("jitter", PublicAttr[Optional[Jitter]]), ], ), ): """A declarative policy for when to request retries when an exception occurs during op execution. Args: max_retries (int): The maximum number of retries to attempt. Defaults to 1. delay (Optional[Union[int,float]]): The time in seconds to wait between the retry being requested and the next attempt being started. This unit of time can be modulated as a function of attempt number with backoff and randomly with jitter. backoff (Optional[Backoff]): A modifier for delay as a function of retry attempt number. jitter (Optional[Jitter]): A randomizing modifier for delay, applied after backoff calculation. """ def __new__( cls, max_retries: int = 1, delay: Optional[check.Numeric] = None, backoff: Optional[Backoff] = None, jitter: Optional[Jitter] = None, ): if backoff is not None and delay is None: raise DagsterInvalidDefinitionError( "Can not set jitter on RetryPolicy without also setting delay" ) if jitter is not None and delay is None: raise DagsterInvalidDefinitionError( "Can not set backoff on RetryPolicy without also setting delay" ) return super().__new__( cls, max_retries=check.int_param(max_retries, "max_retries"), delay=check.opt_numeric_param(delay, "delay"), backoff=check.opt_inst_param(backoff, "backoff", Backoff), jitter=check.opt_inst_param(jitter, "jitter", Jitter), ) def calculate_delay(self, attempt_num: int) -> check.Numeric: return calculate_delay( attempt_num=attempt_num, backoff=self.backoff, jitter=self.jitter, base_delay=self.delay or 0, )
def calculate_delay( attempt_num: int, backoff: Optional[Backoff], jitter: Optional[Jitter], base_delay: float ) -> float: if backoff is Backoff.EXPONENTIAL: calc_delay = ((2**attempt_num) - 1) * base_delay elif backoff is Backoff.LINEAR: calc_delay = base_delay * attempt_num elif backoff is None: calc_delay = base_delay else: check.assert_never(backoff) if jitter is Jitter.FULL: calc_delay = random() * calc_delay elif jitter is Jitter.PLUS_MINUS: calc_delay = calc_delay + ((2 * (random() * base_delay)) - base_delay) elif jitter is None: pass else: check.assert_never(jitter) return calc_delay