dlt._workspace.deployment.interval
Interval computation and run-based upstream freshness checks.
next_scheduled_run
def next_scheduled_run(
trigger: TTrigger,
now_reference: datetime,
tz: str = "UTC",
prev_scheduled_run: Optional[datetime] = None) -> datetime
Compute the next scheduled run for a timed trigger.
Returns the UTC datetime when the job should next run.
Arguments:
trigger- Aschedule:,every:, oronce:trigger.now_reference- UTC reference time.tz- IANA timezone for cron evaluation.prev_scheduled_run- When the previous run was scheduled (forevery:).
Raises:
InvalidTrigger- If trigger is not a timed type.
compute_run_interval
def compute_run_interval(trigger: TTrigger,
now: datetime,
prev_interval_end: Optional[datetime],
tz: str = "UTC") -> TTimeInterval
Half-open [start, end) interval for a non-interval job run.
schedule: and every: triggers carry continuity: prev_interval_end
extends start backward to fill gaps (missed ticks, refresh cascade).
All other trigger types return a point-in-time interval regardless of
prev_interval_end — they model one-shot / event dispatches whose
work-window has no meaningful "since last run" semantic.
schedule:<cron>→ most recently ELAPSED cron interval:[cron_prev(cron_floor(now)), cron_floor(now)).prev_interval_end(if set) overridesstart. In steady state those match.every:<period>→[prev_interval_end, now)if set, else[now - period, now).once:<datetime>→ always[once, once).manual:/http:/webhook:/tag:/deployment:/job.success:/job.fail:/pipeline_name:→ always[now, now).
Arguments:
trigger- Any normalized trigger string.now- Reference upper bound (typically the dispatch /started_attime).prev_interval_end- Last successful work-window end, orNone. Only applies toschedule:andevery:triggers.tz- IANA timezone for cron evaluation. Used only forschedule:.
Returns:
TTimeInterval- Half-open[start, end)tuple, both UTC stdlibdatetime.
Raises:
InvalidTrigger- Iftriggercannot be parsed.
resolve_interval_spec
def resolve_interval_spec(spec: TIntervalSpec,
cron_expr: str,
tz: str = "UTC") -> TTimeInterval
Resolve a TIntervalSpec into a concrete TTimeInterval in UTC.
start is required and snapped backward to the latest cron tick <= start.
end defaults to now, also snapped backward. Cron ticks are evaluated in
tz so DST-sensitive expressions work correctly; the returned datetimes
are always UTC.
cron_floor
def cron_floor(cron_expr: str, dt: datetime) -> datetime
Latest cron tick <= dt, preserving dt's timezone.
Iterates cron in naive local time (stripping dt's tzinfo internally) to
get clean wall-clock semantics across DST transitions, then re-attaches
dt.tzinfo to the result.
cron_lag
def cron_lag(cron_expr: str, dt: datetime, count: int) -> datetime
Floor dt to the latest cron tick, then step count ticks into the past.
count=0 returns the floor tick (dt itself when it falls on a tick).
Negative count steps into the future: count=-1 is the first tick strictly
after the floor. Preserves dt's timezone using the same wall-clock
iteration as cron_floor.
lag_interval
def lag_interval(interval: TTimeInterval,
trigger: Union[str, TTrigger],
count: int = 1,
lag_end: bool = False) -> TTimeInterval
Lag interval start (or end with lag_end) by count trigger ticks into the past.
count=0 snaps the bound to the tick floor, negative count moves it into
the future. For every: triggers the bound shifts by period * count.
Arguments:
interval- The interval to adjust.trigger- Aschedule:orevery:trigger, or a bare cron expression.count- Number of ticks (or periods) to lag, negative moves into the future.lag_end- WhenTrue, adjusts the end instead of the start.
Raises:
ValueError- If the adjusted interval is empty or negative.InvalidTrigger- If the trigger has no time period.
full_days_interval
def full_days_interval(interval: TTimeInterval) -> TTimeInterval
Widen interval to full days: start floored to midnight, end extended to the next midnight. Each bound is adjusted in its own timezone.
is_cron_expression
def is_cron_expression(s: str) -> bool
Check if string is a valid cron expression.
check_all_upstream_run_fresh
def check_all_upstream_run_fresh(
freshness_constraints: List[TFreshnessConstraint],
all_jobs: Dict[str, TJobDefinition],
prev_interval_ends: Dict[str, Optional[datetime]],
now_utc: Optional[datetime] = None) -> Tuple[bool, List[str]]
Check run-based freshness constraints against each upstream's last-completed
interval_end.
prev_interval_ends[upstream_ref] is the interval_end of the upstream's last
successful run (None if it has never completed or its freshness state was
reset). Dispatches per-upstream based on the upstream's default_trigger type:
schedule:withoutinterval→interval_endcovers the most recently elapsed cron tickevery:→interval_endis within the previous period- event/manual →
interval_endis set (any completion is enough)