Worker API Reference#

class lifeblood.worker.Worker(scheduler_addr: AddressChain, *, child_priority_adjustment: ProcessPriorityAdjustment = ProcessPriorityAdjustment.NO_CHANGE, worker_type: WorkerType = WorkerType.STANDARD, config: Optional[Config] = None, singleshot: bool = False, scheduler_ping_interval: float = 10, scheduler_ping_miss_threshold: int = 6, worker_id: Optional[int] = None, pool_address: Optional[AddressChain] = None)#
__init__(scheduler_addr: AddressChain, *, child_priority_adjustment: ProcessPriorityAdjustment = ProcessPriorityAdjustment.NO_CHANGE, worker_type: WorkerType = WorkerType.STANDARD, config: Optional[Config] = None, singleshot: bool = False, scheduler_ping_interval: float = 10, scheduler_ping_miss_threshold: int = 6, worker_id: Optional[int] = None, pool_address: Optional[AddressChain] = None)#
Parameters:
  • scheduler_addr

  • worker_type

  • singleshot

async _cleanup_extra_files()#

cleanup extra files transfered with the task :return:

async cancel_task()#
async delete_logs(invocation_id: int)#
async deliver_invocation_message(destination_invocation_id: int, destination_addressee: str, source_invocation_id: Optional[int], message_body: bytes, addressee_timeout: float = 90.0)#

deliver message to task

the idea is to deliver ONLY when message is waited for. so queues are added/removed by receiver, not by this deliver method current impl is NOT thread safe, it relies on async to separate important regions

get_log_filepath(level, invocation_id: int = None)#
is_started()#
is_stopping() bool#

returns True is stop was called on worker, so worker is closed or in the process of closing

is_task_running() bool#
message_processor() WorkerMessageProcessor#
async run_task(task: Invocation, report_to: AddressChain)#
running_invocation() Optional[Invocation]#
scheduler_message_address() AddressChain#
async scheduler_pinger()#

ping scheduler once in a while. if it misses too many pings - close worker and wait for new broadcasts :return:

async start()#
stop()#
async task_finished()#

is called when current process finishes :return:

task_status() Optional[float]#
wait_till_starts()#
async wait_till_stops()#
worker_message_address() DirectAddress#
async worker_task_addressee_wait(addressee: str, timeout: float = 30) Tuple[int, bytes]#

wait for a data message to addressee to be delivered

Returns:

sender invocation id, message body