Source code for swf_typed._decisions

"""SWF decision task management."""

import abc
import datetime
import warnings
import functools
import dataclasses
import typing as t

from . import _common
from . import _executions

if t.TYPE_CHECKING:
    import botocore.client
    from . import _tasks
    from . import _history
    from . import _workflows
    from . import _activities


[docs] @dataclasses.dataclass class Decision(_common.Serialisable, metaclass=abc.ABCMeta): """Decider decision.""" type: t.ClassVar[str] """Decision type name."""
[docs] @abc.abstractmethod def to_api(self): return {"decisionType": self.type}
[docs] @dataclasses.dataclass class CancelTimerDecision(Decision): """Cancel timer decider decision.""" type: t.ClassVar[str] = "CancelTimer" timer_id: str """Timer ID."""
[docs] def to_api(self): data = super().to_api() data["cancelTimerDecisionAttributes"] = {"timerId": self.timer_id} return data
[docs] @dataclasses.dataclass class CancelWorkflowExecutionDecision(Decision): """Cancel workflow execution decider decision.""" type: t.ClassVar[str] = "CancelWorkflowExecution" details: str = None """Execution cancellation details, usually for explanation."""
[docs] def to_api(self): data = super().to_api() if self.details or self.details == "": data["cancelWorkflowExecutionDecisionAttributes"] = { "details": self.details } return data
[docs] @dataclasses.dataclass class CompleteWorkflowExecutionDecision(Decision): """Complete workflow execution decider decision.""" type: t.ClassVar[str] = "CompleteWorkflowExecution" execution_result: str = None """Execution result."""
[docs] def to_api(self): data = super().to_api() if self.execution_result or self.execution_result == "": data["completeWorkflowExecutionDecisionAttributes"] = { "result": self.execution_result, } return data
[docs] @dataclasses.dataclass class ContinueAsNewWorkflowExecutionDecision(Decision): """Continue as new workflow execution decider decision.""" type: t.ClassVar[str] = "ContinueAsNewWorkflowExecution" execution_input: str = None """Continuing execution input.""" workflow_version: str = None """Continuing execution workflow version.""" execution_configuration: "_executions.PartialExecutionConfiguration" = None """Continuing execution configuration overrides.""" tags: t.List[str] = None """Continuing execution tags."""
[docs] def to_api(self): data = super().to_api() attr_key = "continueAsNewWorkflowExecutionDecisionAttributes" if self.execution_input or self.execution_input == "": data.setdefault(attr_key, {})["input"] = self.execution_input if self.workflow_version or self.workflow_version == "": data.setdefault(attr_key, {})["workflowTypeVersion"] = self.workflow_version if self.execution_configuration: execution_configuration_data = self.execution_configuration.get_api_args() data.setdefault(attr_key, {}).update(execution_configuration_data) if self.tags or self.tags == []: data.setdefault(attr_key, {})["tagList"] = self.tags return data
[docs] @dataclasses.dataclass class FailWorkflowExecutionDecision(Decision): """Fail workflow execution decider decision.""" type: t.ClassVar[str] = "FailWorkflowExecution" reason: str = None """Execution failure reason, usually for classification.""" details: str = None """Execution failure details, usually for explanation."""
[docs] def to_api(self): data = super().to_api() if self.reason or self.details: data["failWorkflowExecutionDecisionAttributes"] = decision_attributes = {} if self.reason or self.reason == "": decision_attributes["reason"] = self.reason if self.details or self.details == "": decision_attributes["details"] = self.details return data
[docs] @dataclasses.dataclass class RecordMarkerDecision(Decision): """Record marker decider decision.""" type: t.ClassVar[str] = "RecordMarker" marker_name: str """Marker name.""" details: str = None """Attached marker data."""
[docs] def to_api(self): data = super().to_api() data["recordMarkerDecisionAttributes"] = {"markerName": self.marker_name} if self.details or self.details == "": data["recordMarkerDecisionAttributes"]["details"] = self.details return data
[docs] @dataclasses.dataclass class RequestCancelActivityTaskDecision(Decision): """Cancel activity task request decider decision.""" type: t.ClassVar[str] = "RequestCancelActivityTask" task_id: str """ID of task to cancel."""
[docs] def to_api(self): data = super().to_api() data["requestCancelActivityTaskDecisionAttributes"] = { "activityId": self.task_id, } return data
[docs] @dataclasses.dataclass class RequestCancelExternalWorkflowExecutionDecision(Decision): """Cancel external workflow execution request decider decision.""" type: t.ClassVar[str] = "RequestCancelExternalWorkflowExecution" execution: t.Union["_executions.ExecutionId", "_executions.CurrentExecutionId"] """ID of execution to cancel.""" control: str = None """Message for future deciders."""
[docs] def to_api(self): data = super().to_api() attr_key = "requestCancelExternalWorkflowExecutionDecisionAttributes" data[attr_key] = self.execution.to_api() if self.control or self.control == "": data[attr_key]["control"] = self.control return data
[docs] @dataclasses.dataclass class ScheduleActivityTaskDecision(Decision): """Schedule activity task decider decision.""" type: t.ClassVar[str] = "ScheduleActivityTask" activity: "_activities.ActivityId" """Task activity.""" task_id: str """Task ID.""" task_input: str = None """Task input.""" task_configuration: "_tasks.PartialTaskConfiguration" = None """Task configuration overrides.""" control: str = None """Message for future deciders."""
[docs] def to_api(self): data = super().to_api() data["scheduleActivityTaskDecisionAttributes"] = decision_attributes = { "activityType": self.activity.to_api(), "activityId": self.task_id, } if self.task_input or self.task_input == "": decision_attributes["input"] = self.task_input if self.control or self.control == "": decision_attributes["control"] = self.control if self.task_configuration: task_configuration_data = self.task_configuration.get_api_args() decision_attributes.update(task_configuration_data) return data
[docs] @dataclasses.dataclass class ScheduleLambdaFunctionDecision(Decision): """Schedule Lambda function invocation decider decision.""" type: t.ClassVar[str] = "ScheduleLambdaFunction" lambda_function: str """Lambda function name or ARN (latest/version/alias) to invoke.""" task_id: str """Task ID.""" task_input: str = None """Lambda function input.""" task_timeout: datetime.timedelta = _common.unset """Lambda function invocation timeout.""" control: str = None """Message for future deciders."""
[docs] def to_api(self): data = super().to_api() data["scheduleLambdaFunctionDecisionAttributes"] = decision_attributes = { "lambda": self.lambda_function, "id": self.task_id, } if self.task_input or self.task_input == "": decision_attributes["input"] = self.task_input if self.control or self.control == "": decision_attributes["control"] = self.control if self.task_timeout or self.task_timeout == datetime.timedelta(0): decision_attributes["startToCloseTimeout"] = str( int(self.task_timeout.total_seconds()) ) return data
[docs] @dataclasses.dataclass class SignalExternalWorkflowExecutionDecision(Decision): """Signal external workflow execution decider decision.""" type: t.ClassVar[str] = "SignalExternalWorkflowExecution" execution: t.Union["_executions.ExecutionId", "_executions.CurrentExecutionId"] """ID of execution to signal.""" signal: str """Signal name.""" signal_input: str = None """Signal input.""" control: str = None """Message for future deciders."""
[docs] def to_api(self): data = super().to_api() attr_key = "signalExternalWorkflowExecutionDecisionAttributes" data[attr_key] = self.execution.to_api() data[attr_key]["signalName"] = self.signal if self.signal_input or self.signal_input == "": data[attr_key]["input"] = self.signal_input if self.control or self.control == "": data[attr_key]["control"] = self.control return data
[docs] @dataclasses.dataclass class StartChildWorkflowExecutionDecision(Decision): """Start child workflow execution decider decision.""" type: t.ClassVar[str] = "StartChildWorkflowExecution" workflow: "_workflows.WorkflowId" """Child execution workflow.""" execution: "_executions.CurrentExecutionId" """Child execution workflow-ID.""" execution_input: str = None """Child execution input.""" execution_configuration: "_executions.PartialExecutionConfiguration" = None """Child execution configuration overrides.""" tags: t.List[str] = None """Child execution tags""" control: str = None """Message for future deciders."""
[docs] def to_api(self): data = super().to_api() data["startChildWorkflowExecutionDecisionAttributes"] = decision_attributes = { "workflowType": self.workflow.to_api(), "workflowId": self.execution.id, } if self.execution_input or self.execution_input == "": decision_attributes["input"] = self.execution_input if self.execution_configuration: decision_attributes.update(self.execution_configuration.get_api_args()) if self.control or self.control == "": decision_attributes["control"] = self.control if self.tags or self.tags == []: decision_attributes["tagList"] = self.tags return data
[docs] @dataclasses.dataclass class StartTimerDecision(Decision): """Start timer decider decision.""" type: t.ClassVar[str] = "StartTimer" timer_id: str """Timer ID.""" timer_duration: datetime.timedelta """Timer duration.""" control: str = None """Message for future deciders."""
[docs] def to_api(self): data = super().to_api() data["startTimerDecisionAttributes"] = { "timerId": self.timer_id, "startToFireTimeout": str(int(self.timer_duration.total_seconds())), } if self.control or self.control == "": data["startTimerDecisionAttributes"]["control"] = self.control return data
[docs] @dataclasses.dataclass class DecisionTask(_common.Deserialisable): """Decider decision task.""" token: str """Task token, provided by SWF.""" execution: "_executions.ExecutionId" """Execution which decisions are being made for.""" workflow: "_workflows.WorkflowId" """Execution workflow.""" _execution_history_iter: t.Iterable["_history.Event"] decision_task_started_execution_history_event_id: int """History event ID for decision-task start.""" previous_decision_task_started_execution_history_event_id: int = None """History event ID for previous decision-task start.""" _execution_history_list: t.List["_history.Event"] = dataclasses.field(init=False) def __post_init__(self): self._execution_history_list = []
[docs] @classmethod def from_api( cls, data, execution_history_iter: t.Iterable["_history.Event"] = None ) -> "DecisionTask": """Deserialise decision task from SWF API response data. Args: data: SWF API response decision task data execution_history_iter: execution history events, for lazy handling of paginated history. Default: get from response data """ from . import _workflows from . import _executions if not execution_history_iter: from . import _history execution_history_iter = ( _history.Event.from_api(d) for d in data["events"] ) return cls( token=data["taskToken"], execution=_executions.ExecutionId.from_api(data["workflowExecution"]), workflow=_workflows.WorkflowId.from_api(data["workflowType"]), _execution_history_iter=execution_history_iter, decision_task_started_execution_history_event_id=data["startedEventId"], previous_decision_task_started_execution_history_event_id=data.get( "previousStartedEventId" ), )
@property def execution_history_iter(self) -> t.Generator["_history.Event", None, None]: """Execution history events iterable.""" for event in self._execution_history_iter: self._execution_history_list.append(event) yield event @property def execution_history(self) -> t.List["_history.Event"]: """Execution history events.""" self._execution_history_list += list(self._execution_history_iter) return self._execution_history_list
[docs] def get_number_of_pending_decision_tasks( task_list: str, domain: str, client: "botocore.client.BaseClient" = None, ) -> int: """Get the number of pending decision tasks. Warns if the number of pending tasks is greater than what's returned. Args: task_list: decision task-list domain: domain of task-list client: SWF client Returns: number of pending tasks """ client = _common.ensure_client(client) response = client.count_pending_decision_tasks( domain=domain, taskList=dict(name=task_list) ) if response["truncated"]: warnings.warn("Actual task count greater than returned amount") return response["count"]
[docs] def request_decision_task( task_list: str, domain: str, decider_identity: str = None, no_tasks_callback: t.Callable[[], None] = None, client: "botocore.client.BaseClient" = None, ) -> DecisionTask: """Request (poll for) a decision task; blocks until task is received. Workflow execution history events are retrieved semi-lazily. Args: task_list: decision task-list to request from domain: domain of task-list decider_identity: decider identity, recorded in execution history no_tasks_callback: called after no tasks were provided by SWF client: SWF client Returns: decision task """ from . import _history def iter_history() -> t.Generator["_history.Event", None, None]: r = response while r.get("nextPageToken"): future = _common.executor.submit(call, nextPageToken=r["nextPageToken"]) yield from (_history.Event.from_api(d) for d in r.get("events") or []) r = future.result() yield from (_history.Event.from_api(d) for d in r.get("events") or []) client = _common.ensure_client(client) kw = {} if decider_identity or decider_identity == "": kw["identity"] = decider_identity call = functools.partial( client.poll_for_decision_task, domain=domain, taskList=dict(name=task_list), **kw, ) with _common.polling_socket_timeout(): while True: response = call() if response["taskToken"]: break no_tasks_callback() return DecisionTask.from_api(response, iter_history())
[docs] def send_decisions( token: str, decisions: t.List[Decision], context: str = None, client: "botocore.client.BaseClient" = None, ) -> None: """Make decisions for a workflow execution, completing decision task. Args: token: decision task identifying token decisions: decisions to make context: workflow execution context to set client: SWF client """ client = _common.ensure_client(client) kw = {} if context or context == "": kw["executonContext"] = context decisions_data = [d.to_api() for d in decisions] client.respond_decision_task_completed( taskToken=token, decisions=decisions_data, **kw )