Issue
I have been using apscheduler
. A recurring problem regarding the package is that if for any reason, a running job hangs indefinitely (for example if you create an infinite while loop inside of it) it will stop the whole process forever as there is no time limit option for the added jobs.
Apscheduler has stated multiple times that they will not add a timelimit due to various reasons (short explanation here), however the problem still remains. You could create a job that will run for days, only to stop because a webrequest gets no response and apscheduler will wait for it indefinitely.
I've been trying to find a way to add this time limit to a job. For example using the wrapt-timeout-decorator
package. I would create a function which runs my job inside it, that has a time limit, and I add this function to aposcheduler. Unfortunately, the two packages collide with a circular import.
from wrapt_timeout_decorator.wrapt_timeout_decorator import timeout
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
class MyJob: # implementation is unnecessary to show here
...
@timeout(dec_timeout=600, use_signals=False)
def run_job(job: MyJob) -> None:
job.run()
job = MyJob()
scheduler = BackgroundScheduler(daemon=True)
scheduler.add_job(func=run_job, kwargs={"job": job}, trigger=CronTrigger.from_crontab(sheet_job.cron))
scheduler.start()
File "C:\Users...\AppData\Local\Programs\Python\Python39\lib\site-packages\multiprocess\context.py", line 62, in Pipe from .connection import Pipe ImportError: cannot import name 'Pipe' from partially initialized module 'multiprocess.connection' (most likely due to a circular import) (C:\Users...\AppData\Local\Programs\Python\Python39\lib\site-packages\multiprocess\connection.py)
I've also tried adding a self made timeout decorator, shown here, but I did not get the desired outcome.
My question is: Is there a way to add a time limit to an apscheduler job, or are there any other similar packages where creating a cron job with a time limit is possible, or do you know of any self-made solution? (the program will run on Windows).
Solution
Based on the number of answers and my own research this is not currently possible with apscheduler
. I have written my own quick implementation. The syntax is very similar to apscheduler, you just need to create a similar Scheduler
object and add jobs to it with add_job
, then use start
. For my needs this has solved the issue. I'm adding the implementation here as it may help somebody the future.
from typing import Callable, Optional, Any
from datetime import datetime, timedelta
from croniter import croniter
from enum import Enum
import traceback
import threading
import ctypes
import time
class JobStatus(Enum):
NOT_RUNNING = "Not running"
RUNNING = "Running"
class StoppableThread(threading.Thread):
def get_id(self):
if hasattr(self, '_thread_id'):
return self._thread_id
for id, thread in threading._active.items():
if thread is self:
return id
return None
def stop(self):
thread_id = self.get_id()
if thread_id is None:
print("Failed find thread id. Unable to stop thread.")
return
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
print("Failed to stop thread.")
class JobRunner:
def __init__(self, function: Callable[..., None], cron_tab: str, function_kwargs: Optional[dict[str, Any]]=None, timeout_minutes: Optional[int]=None) -> None:
self.function = function
self.cron_tab = cron_tab
self.function_kwargs = function_kwargs if function_kwargs is not None else {}
self.timeout_minutes = timeout_minutes
self.next_run_time = datetime.now()
self.next_timeout_time = None if timeout_minutes is None else datetime.now() + timedelta(minutes=timeout_minutes)
self._job_thread: Optional[StoppableThread] = None
self._update_next_run_time()
def update(self) -> None:
if self.get_job_status() == JobStatus.RUNNING:
if self.timeout_minutes is not None:
if datetime.now() < self.next_timeout_time:
print(f"Job stopped due to timeout after not finishing in {self.timeout_minutes} minutes.")
self._job_thread.stop()
self._job_thread.join()
self._job_thread = None
return
if datetime.now() < self.next_run_time:
return
self._job_thread = StoppableThread(target=self.function, kwargs=self.function_kwargs)
self._job_thread.start()
self._update_next_run_time()
self._update_next_timeout()
def get_job_status(self) -> JobStatus:
if self._job_thread is None:
return JobStatus.NOT_RUNNING
if self._job_thread.is_alive():
return JobStatus.RUNNING
return JobStatus.NOT_RUNNING
def _update_next_run_time(self) -> None:
cron = croniter(self.cron_tab, datetime.now())
self.next_run_time = cron.get_next(datetime)
def _update_next_timeout(self) -> None:
if self.timeout_minutes is not None:
self.next_timeout_time = datetime.now() + timedelta(minutes=self.timeout_minutes)
class Scheduler:
def __init__(self) -> None:
self._jobs: list[JobRunner] = []
def add_job(self, function: Callable[..., None], cron_tab: str, function_kwargs: Optional[dict[str, Any]]=None, timeout_minutes: Optional[int]=None) -> None:
self._jobs.append(JobRunner(function, cron_tab, function_kwargs, timeout_minutes))
def start(self) -> None:
while True:
time.sleep(1)
try:
for job_runner in self._jobs:
job_runner.update()
except Exception:
print(f"An error occured while running one of the jobs: {traceback.format_exc()}")
Answered By - CaptainCsaba Answer Checked By - Marie Seifert (WPSolving Admin)