Skip to main content

Python SDK developer's guide - Features

The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.

WORK IN PROGRESS

This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.

If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.

In this section you can find the following:

Signals

A SignalLink preview iconWhat is a Signal?

A Signal is an asynchronous request to a Workflow Execution.

Learn more is a message sent to a running Workflow Execution.

Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.

A Signal has a name and can have arguments.

  • The name, also called a Signal type, is a string.
  • The arguments must be serializable. To define a Signal, set the Signal decorator @workflow.signal on the Signal function inside your Workflow.

Customize name

Non-dynamic methods can only have positional arguments. Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed.

Return values from Signal methods are ignored.

You can have a name parameter to customize the Signal's name, otherwise it defaults to the unqualified method __name__.

View source code
from temporalio import workflow
# ...
# ...
@workflow.signal
async def submit_greeting(self, name: str) -> None:
await self._pending_greetings.put(name)

@workflow.signal
def exit(self) -> None:
# ...
@workflow.signal(name="Custom Signal Name")
async def custom_signal(self, name: str) -> None:
await self._pending_greetings.put(name)

Workflows listen for Signals by the Signal's name.

To send a Signal to the Workflow, use the signal method from the WorkflowHandle class.

View source code
from temporalio.client import Client
# ...
# ...
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")

Send Signal from Client

When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaledLink preview iconEvents reference

Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.

Learn more Event appears in the Event History of the Workflow that receives the Signal.

To send a Signal from the Client, use the signal() function on the Workflow handle.

To get the Workflow handle, you can use any of the following options.

View source code
from temporalio.client import Client
# ...
# ...
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
GreetingWorkflow.run,
id="your-greeting-workflow",
task_queue="signal-tq",
)
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")

Send Signal from Workflow

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.

When an External Signal is sent:

Use get_external_workflow_handle_for to get a typed Workflow handle to an existing Workflow by its identifier. Use get_external_workflow_handle when you don't know the type of the other Workflow.

note

The Workflow Type passed is only for type annotations and not for validation.

View source code
# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")

Signal-With-Start

Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.

If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

To send a Signal-With-Start in Python, use the start_workflow() method and pass the start_signal argument with the name of your Signal.

View source code
from temporalio.client import Client
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)

Queries

A QueryLink preview iconWhat is a Query?

A Query is a synchronous operation that is used to report the state of a Workflow Execution.

Learn more is a synchronous operation that is used to get the state of a Workflow Execution.

Define Query

A Query has a name and can have arguments.

To define a Query, set the Query decorator @workflow.query on the Query function inside your Workflow.

Customize names

You can have a name parameter to customize the Query's name, otherwise it defaults to the unqualified method __name__.

note

You can either set the name or the dynamic parameter in a Query's decorator, but not both.

View source code
# ...
@workflow.query
def greeting(self) -> str:
return self._greeting

Handle Query

Queries are handled by your Workflow.

Don’t include any logic that causes CommandLink preview iconWhat is a Command?

A Command is a requested action issued by a Worker to the Temporal Cluster after a Workflow Task Execution completes.

Learn more generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.

To send a Query to the Workflow, use the query method from the WorkflowHandle class.

View source code
# ...
result = await handle.query(GreetingWorkflow.greeting)

Send Query

Queries are sent from a Temporal Client.

To send a Query to a Workflow Execution from Client code, use the query() method on the Workflow handle.

View source code
# ...
result = await handle.query(GreetingWorkflow.greeting)

Workflow timeouts

Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.

Workflow timeouts are set when starting the Workflow ExecutionLink preview iconWorkflow timeouts

Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.

Learn more.

Set the timeout from either the start_workflow() or execute_workflow() asynchronous methods.

Available timeouts are:

  • execution_timeout
  • run_timeout
  • task_timeout
handle = await client.start_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
# Set Workflow Timeout duration
execution_timeout=timedelta(seconds=2),
# run_timeout=timedelta(seconds=2),
# task_timeout=timedelta(seconds=2),
)
handle = await client.execute_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
# Set Workflow Timeout duration
execution_timeout=timedelta(seconds=2),
# run_timeout=timedelta(seconds=2),
# task_timeout=timedelta(seconds=2),
)

Workflow retries

A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Use a Retry PolicyLink preview iconWhat is a Retry Policy?

A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.

Learn more to retry a Workflow Execution in the event of a failure.

Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.

Set the Retry Policy from either the start_workflow() or execute_workflow() asynchronous methods.

handle = await client.start_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
handle = await client.execute_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)

Activity timeouts

Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.

The following timeouts are available in the Activity Options.

An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.

Activity options are set as keyword arguments after the Activity arguments.

Available timeouts are:

  • schedule_to_close_timeout
  • schedule_to_start_timeout
  • start_to_close_timeout
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
name,
schedule_to_close_timeout=timedelta(seconds=5),
# schedule_to_start_timeout=timedelta(seconds=5),
# start_to_close_timeout=timedelta(seconds=5),
)

Activity retries

A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Activity Executions are automatically associated with a default Retry PolicyLink preview iconWhat is a Retry Policy?

A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.

Learn more if a custom one is not provided.

To create an Activity Retry Policy in Python, set the RetryPolicy class within the start_activity() or execute_activity() function.

The following example sets the maximum interval to 2 seconds.

workflow.execute_activity(
your_activity,
name,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)

Activity retry simulator

Use this tool to visualize total Activity Execution times and experiment with different Activity timeouts and Retry Policies.

The simulator is based on a common Activity use-case, which is to call a third party HTTP API and return the results. See the example code snippets below.

Use the Activity Retries settings to configure how long the API request takes to succeed or fail. There is an option to generate scenarios. The Task Time in Queue simulates the time the Activity Task might be waiting in the Task Queue.

Use the Activity Timeouts and Retry Policy settings to see how they impact the success or failure of an Activity Execution.

Sample Activity

import axios from 'axios';

async function testActivity(url: string): Promise<void> {
await axios.get(url);
}

export default testActivity;

Activity Retries (in ms)

×

Activity Timeouts (in ms)

Retry Policy (in ms)

Success after 1 ms

{
"startToCloseTimeout": 10000,
"retryPolicy": {
"backoffCoefficient": 2,
"initialInterval": 1000
}
}

Activity Heartbeats

An Activity HeartbeatLink preview iconWhat is an Activity Heartbeat?

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.

Learn more is a ping from the Worker ProcessLink preview iconWhat is a Worker Process?

A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Server with the results.

Learn more that is executing the Activity to the Temporal ClusterLink preview iconWhat is a Temporal Cluster?

A Temporal Cluster is the Temporal Server paired with persistence.

Learn more. Each Heartbeat informs the Temporal Cluster that the Activity ExecutionLink preview iconWhat is an Activity Execution?

An Activity Execution is the full chain of Activity Task Executions.

Learn more is making progress and the Worker has not crashed. If the Cluster does not receive a Heartbeat within a Heartbeat TimeoutLink preview iconWhat is a Heartbeat Timeout?

A Heartbeat Timeout is the maximum time between Activity Heartbeats.

Learn more time period, the Activity will be considered failed and another Activity Task ExecutionLink preview iconWhat is an Activity Task Execution?

An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.

Learn more may be scheduled according to the Retry Policy.

Heartbeats may not always be sent to the Cluster—they may be throttledLink preview iconWhat is an Activity Heartbeat?

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.

Learn more by the Worker.

Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.

Heartbeats can contain a details field describing the Activity's current progress. If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.

To Heartbeat an Activity Execution in Python, use the heartbeat() API.

@activity.defn
async def your_activity_definition() -> str:
activity.heartbeat("heartbeat details!")

In addition to obtaining cancellation information, Heartbeats also support detail data that persists on the server for retrieval during Activity retry. If an Activity calls heartbeat(123, 456) and then fails and is retried, heartbeat_details returns an iterable containing 123 and 456 on the next Run.

Heartbeat Timeout

A Heartbeat TimeoutLink preview iconWhat is a Heartbeat Timeout?

A Heartbeat Timeout is the maximum time between Activity Heartbeats.

Learn more works in conjunction with Activity HeartbeatsLink preview iconWhat is an Activity Heartbeat?

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.

Learn more.

heartbeat_timeout is a class variable for the start_activity() function used to set the maximum time between Activity Heartbeats.

workflow.start_activity(
activity="your-activity",
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)

execute_activity() is a shortcut for start_activity() that waits on its result.

To get just the handle to wait and cancel separately, use start_activity(). execute_activity() should be used in most cases unless advanced task capabilities are needed.

workflow.execute_activity(
activity="your-activity",
name,
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)

Asynchronous Activity Completion

Asynchronous Activity CompletionLink preview iconWhat is Asynchronous Activity Completion?

Asynchronous Activity Completion occurs when an external system provides the final result of a computation, started by an Activity, to the Temporal System.

Learn more enables the Activity Function to return without the Activity Execution completing.

There are three steps to follow:

  1. The Activity provides the external system with identifying information needed to complete the Activity Execution. Identifying information can be a Task TokenLink preview iconWhat is a Task Token?

    A Task Token is a unique Id that correlates to an Activity Execution.

    Learn more, or a combination of Namespace, Workflow Id, and Activity Id.
  2. The Activity Function completes in a way that identifies it as waiting to be completed by an external system.
  3. The Temporal Client is used to Heartbeat and complete the Activity.

To mark an Activity as completing asynchoronus, do the following inside the Activity.

# Capture token for later completion
captured_token = activity.info().task_token
activity.raise_complete_async()

To update an Activity outside the Activity, use the get_async_activity_handle() method to get the handle of the Activity.

handle = my_client.get_async_activity_handle(task_token=captured_token)

Then, on that handle, you can call the results of the Activity, heartbeat, complete, fail, or report_cancellation method to update the Activity.

await handle.complete("Completion value.")

Cancel an Activity

Canceling an Activity from within a Workflow requires that the Activity Execution sends Heartbeats and sets a Heartbeat Timeout. If the Heartbeat is not invoked, the Activity cannot receive a cancellation request. When any non-immediate Activity is executed, the Activity Execution should send Heartbeats and set a Heartbeat TimeoutLink preview iconWhat is a Heartbeat Timeout?

A Heartbeat Timeout is the maximum time between Activity Heartbeats.

Learn more to ensure that the server knows it is still working.

When an Activity is canceled, an error is raised in the Activity at the next available opportunity. If cleanup logic needs to be performed, it can be done in a finally clause or inside a caught cancel error. However, for the Activity to appear canceled the exception needs to be re-raised.

note

Unlike regular Activities, Local ActivitiesLink preview iconWhat is a Local Activity?

A Local Activity is an Activity Execution that executes in the same process as the Workflow Execution that spawns it.

Learn more can be canceled if they don't send Heartbeats. Local Activities are handled locally, and all the information needed to handle the cancellation logic is available in the same Worker process.

To cancel an Activity from a Workflow Execution, call the cancel() method on the Activity handle that is returned from start_activity().

@activity.defn
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
try:
while True:
print("Heartbeating cancel activity")
await asyncio.sleep(0.5)
activity.heartbeat("some details")
except asyncio.CancelledError:
print("Activity cancelled")
raise


@activity.defn
async def run_activity(input: ComposeArgsInput):
print("Executing activity")
return input.arg1 + input.arg2

@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, input: ComposeArgsInput) -> None:
activity_handle = workflow.start_activity(
cancel_activity,
ComposeArgsInput(input.arg1, input.arg2),
start_to_close_timeout=timedelta(minutes=5),
heartbeat_timeout=timedelta(seconds=30),
)

await asyncio.sleep(3)
activity_handle.cancel()
note

The Activity handle is a Python task. By calling cancel(), you're essentially requesting the task to be canceled.

Child Workflows

A Child Workflow ExecutionLink preview iconWhat is a Child Workflow Execution?

A Child Workflow Execution is a Workflow Execution that is spawned from within another Workflow.

Learn more is a Workflow Execution that is scheduled from within another Workflow using a Child Workflow API.

When using a Child Workflow API, Child Workflow related Events (StartChildWorkflowExecutionInitiatedLink preview iconEvents reference

Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.

Learn more, ChildWorkflowExecutionStartedLink preview iconEvents reference

Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.

Learn more, ChildWorkflowExecutionCompletedLink preview iconEvents reference

Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.

Learn more, etc...) are logged in the Workflow Execution Event History.

Always block progress until the ChildWorkflowExecutionStartedLink preview iconEvents reference

Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.

Learn more Event is logged to the Event History to ensure the Child Workflow Execution has started. After that, Child Workflow Executions may be abandoned using the default Abandon Parent Close PolicyLink preview iconWhat is a Parent Close Policy?

If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).

Learn more set in the Child Workflow Options.

To be sure that the Child Workflow Execution has started, first call the Child Workflow Execution method on the instance of Child Workflow future, which returns a different future.

Then get the value of an object that acts as a proxy for a result that is initially unknown, which is what waits until the Child Workflow Execution has spawned.

To spawn a Child Workflow Execution in Python, use the execute_child_workflow() function. execute_child_workflow() starts the Child Workflow and waits for completion.

await workflow.execute_child_workflow(MyWorkflow.run, "my child arg", id="my-child-id")

Alternatively, use the start_child_workflow() function to start a Child Workflow and return its handle. This is useful if you want to do something after it has only started, or to get the workflow/run ID, or to be able to signal it while running. To wait for completion, simply await the handle. execute_child_workflow() is a helper function for start_child_workflow() + await handle.

await workflow.start_child_workflow(MyWorkflow.run, "my child arg", id="my-child-id")

Parent Close Policy

A Parent Close PolicyLink preview iconWhat is a Parent Close Policy?

If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).

Learn more determines what happens to a Child Workflow Execution if its Parent changes to a Closed status (Completed, Failed, or Timed Out).

The default Parent Close Policy option is set to terminate the Child Workflow Execution.

Set the parent_close_policy parameter inside the start_child_workflow function or the execute_child_workflow() function to specify the behavior of the Child Workflow when the Parent Workflow closes.

async def run(self, name: str) -> str:
return await workflow.execute_child_workflow(
ComposeGreeting.run,
ComposeGreetingInput("Hello", name),
id="hello-child-workflow-workflow-child-id",
parent_close_policy=TERMINATE,
)
note

execute_child_workflow() is a shortcut function for temporalio.workflow.start_child_workflow() plus handle.result().

Continue-As-New

Continue-As-NewLink preview iconWhat is Continue-As-New?

Continue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.

Learn more enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large. The Workflow Execution spawned from the use of Continue-As-New has the same Workflow Id, a new Run Id, and a fresh Event History and is passed all the appropriate parameters.

To Continue-As-New in Python, call the continue_as_new() function from inside your Workflow, which will stop the Workflow immediately and Continue-As-New.

workflow.continue_as_new("your-workflow-name")

Timers

A Workflow can set a durable timer for a fixed time period. In some SDKs, the function is called sleep(), and in others, it's called timer().

A Workflow can sleep for months. Timers are persisted, so even if your Worker or Temporal Cluster is down when the time period completes, as soon as your Worker and Cluster are back up, the sleep() call will resolve and your code will continue executing.

Sleeping is a resource-light operation: it does not tie up the process, and you can run millions of Timers off a single Worker.

To set a Timer in Python, call the asyncio.sleep() function and pass the duration in seconds you want to wait before continuing.

await asyncio.sleep(5)

Schedule a Workflow

Scheduling Workflows is a crucial aspect of any automation process, especially when dealing with time-sensitive tasks. By scheduling a Workflow, you can automate repetitive tasks, reduce the need for manual intervention, and ensure timely execution of your business processes

Use any of the following action to help Schedule a Workflow Execution and take control over your automation process.

Create

The create action enables you to create a new Schedule. When you create a new Schedule, a unique Schedule ID is generated, which you can use to reference the Schedule in other Schedule commands.

To create a Scheduled Workflow Execution in Python, use the create_schedule() asynchronous method on the Client. Then pass the Schedule ID and the Schedule object to the method to create a Scheduled Workflow Execution. Set the action parameter to ScheduleActionStartWorkflow to start a Workflow Execution. Optionally, you can set the spec parameter to ScheduleSpec to specify the schedule or set the intervals parameter to ScheduleIntervalSpec to specify the interval. Other options include: cron_expressions, skip, start_at, and jitter.

View source code
# ...
async def main():
client = await Client.connect("localhost:7233")

await client.create_schedule(
"workflow-schedule-id",
Schedule(
action=ScheduleActionStartWorkflow(
YourSchedulesWorkflow.run,
"my schedule arg",
id="schedules-workflow-id",
task_queue="schedules-task-queue",
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
),
state=ScheduleState(note="Here's a note on my Schedule."),
),
)

Backfill

The backfill action executes Actions ahead of their specified time range. This command is useful when you need to execute a missed or delayed Action, or when you want to test the Workflow before its scheduled time.

To Backfill a Scheduled Workflow Execution in Python, use the backfill() asynchronous method on the Schedule Handle.

View source code
# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)
now = datetime.utcnow()
await handle.backfill(
ScheduleBackfill(
start_at=now - timedelta(minutes=10),
end_at=now - timedelta(minutes=9),
overlap=ScheduleOverlapPolicy.ALLOW_ALL,
),
),

Delete

The delete action enables you to delete a Schedule. When you delete a Schedule, it does not affect any Workflows that were started by the Schedule.

To delete a Scheduled Workflow Execution in Python, use the delete() asynchronous method on the Schedule Handle.

View source code
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

await handle.delete()

Describe

The describe action shows the current Schedule configuration, including information about past, current, and future Workflow Runs. This command is helpful when you want to get a detailed view of the Schedule and its associated Workflow Runs.

To describe a Scheduled Workflow Execution in Python, use the describe() asynchronous method on the Schedule Handle. You can get a complete list of the attributes of the Scheduled Workflow Execution from the ScheduleDescription class.

View source code
# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

desc = await handle.describe()

print(f"Returns the note: {desc.schedule.state.note}")

List

The list action lists all the available Schedules. This command is useful when you want to view a list of all the Schedules and their respective Schedule IDs.

To list all schedules, use the list_schedules() asynchronous method on the Client. If a schedule is added or deleted, it may not be available in the list immediately.

View source code
# ...
async def main() -> None:
client = await Client.connect("localhost:7233")
async for schedule in await client.list_schedules():
print(f"List Schedule Info: {schedule.info}.")

Pause

The pause action enables you to pause and unpause a Schedule. When you pause a Schedule, all the future Workflow Runs associated with the Schedule are temporarily stopped. This command is useful when you want to temporarily halt a Workflow due to maintenance or any other reason.

To pause a Scheduled Workflow Execution in Python, use the pause() asynchronous method on the Schedule Handle. You can pass a note to the pause() method to provide a reason for pausing the schedule.

View source code
# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

await handle.pause(note="Pausing the schedule for now")

Trigger

The trigger action triggers an immediate action with a given Schedule. By default, this action is subject to the Overlap Policy of the Schedule. This command is helpful when you want to execute a Workflow outside of its scheduled time.

To trigger a Scheduled Workflow Execution in Python, use the trigger() asynchronous method on the Schedule Handle.

View source code
# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

await handle.trigger()

Update

The update action enables you to update an existing Schedule. This command is useful when you need to modify the Schedule's configuration, such as changing the start time, end time, or interval.

Create a function that takes ScheduleUpdateInput and returns ScheduleUpdate. To update a Schedule, use a callback to build the update from the description. The following example updates the Schedule to use a new argument.

View source code
# ...
async def update_schedule_simple(input: ScheduleUpdateInput) -> ScheduleUpdate:
schedule_action = input.description.schedule.action

if isinstance(schedule_action, ScheduleActionStartWorkflow):
schedule_action.args = ["my new schedule arg"]
return ScheduleUpdate(schedule=input.description.schedule)

Temporal Cron Jobs

A Temporal Cron JobLink preview iconWhat is a Temporal Cron Job?

A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.

Learn more is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.

A Cron Schedule is provided as an option when the call to spawn a Workflow Execution is made.

You can set each Workflow to repeat on a schedule with the cron_schedule option from either the start_workflow() or execute_workflow() asynchronous methods:

await client.start_workflow(
"your_workflow_name",
id="your-workflow-id",
task_queue="your-task-queue",
cron_schedule="* * * * *",
)