-
Notifications
You must be signed in to change notification settings - Fork 51
Open
Description
flask_rq2.job.FlaskJob is nice, but initializing the app each time can be time-consuming if the app has to make a database connection, initialize some other extensions, etc. It'd be great if this were handled by a custom Worker class. I gave it a try and while I haven't tested extensively this seems to work:
class FlaskWorker(Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.script_info = ScriptInfo()
self.app = self.load_app()
def perform_job(self, job, queue):
with self.app.app_context():
return super().perform_job(job, queue)
def load_app(self):
if current_app:
app = current_app
else:
app = self.script_info.load_app()
return appIf this seems useful to anyone else I'd be happy to submit a PR.
Metadata
Metadata
Assignees
Labels
No labels