Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
:caption: TransformationCleaningAgent options

"""

# # imports
import ast
import errno
Expand Down Expand Up @@ -144,10 +145,13 @@ def initialize(self):
return result
self.taskQueueDB = result["Value"]()

result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
except RuntimeError:
pass
Comment thread
fstagni marked this conversation as resolved.

return S_OK()

Expand Down
44 changes: 39 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE

Expand All @@ -55,7 +54,10 @@ def __init__(self, *args, **kwargs):

# clients
self.jobDB = None
self.taskQueueDB = None
self.pilotAgentsDB = None
self.sandboxDB = None
self.storageManagementDB = None

self.maxJobsAtOnce = 500
self.prodTypes = []
Expand All @@ -67,8 +69,33 @@ def __init__(self, *args, **kwargs):
def initialize(self):
"""Sets defaults"""

self.jobDB = JobDB()
self.sandboxDB = SandboxMetadataDB()
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
if not result["OK"]:
return result
self.jobDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
if not result["OK"]:
return result
self.taskQueueDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
if not result["OK"]:
return result
self.pilotAgentsDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB")
if not result["OK"]:
return result
self.sandboxDB = result["Value"]()

try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
except RuntimeError:
pass
Comment thread
fstagni marked this conversation as resolved.

agentTSTypes = self.am_getOption("ProductionTypes", [])
if agentTSTypes:
Expand Down Expand Up @@ -239,7 +266,14 @@ def _deleteRemoveJobs(self, jobList, remove=False):
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
result = wmsClient.removeJob(jobsList)
else:
result = kill_delete_jobs(RIGHT_DELETE, jobsList)
result = kill_delete_jobs(
RIGHT_DELETE,
jobsList,
jobdb=self.jobDB,
taskqueuedb=self.taskQueueDB,
pilotagentsdb=self.pilotAgentsDB,
storagemanagementdb=self.storageManagementDB,
)
if not result["OK"]:
self.log.error(
f"Could not {'remove' if remove else 'delete'} jobs",
Expand Down
11 changes: 7 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ def initialize(self):
return result
self.pilotAgentsDB = result["Value"]()

result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
except RuntimeError:
pass
Comment thread
fstagni marked this conversation as resolved.

# getting parameters

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" Test class for Job Cleaning Agent
"""
"""Test class for Job Cleaning Agent"""

from unittest.mock import MagicMock

import pytest
Expand All @@ -16,6 +16,9 @@
mockNone = MagicMock()
mockNone.return_value = None
mockJMC = MagicMock()
mockJobDB = MagicMock()
mockJobDB.getDistinctJobAttributes = mockReply
mockJobDB.selectJobs = mockReply


@pytest.fixture
Expand All @@ -27,16 +30,21 @@ def jca(mocker):
create=True,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)

def mock_load_object(module_path, class_name):
mocks = {
"JobDB": mockJobDB,
"TaskQueueDB": MagicMock(),
"PilotAgentsDB": MagicMock(),
"SandboxMetadataDB": MagicMock(),
"StorageManagementDB": MagicMock(),
}
return {"OK": True, "Value": lambda: mocks[class_name]}

mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB.__init__", side_effect=mockNone
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
side_effect=mock_load_object,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)

jca = JobCleaningAgent()
jca.log = gLogger
jca.log.setLevel("DEBUG")
Expand Down Expand Up @@ -128,15 +136,28 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):

mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params)

def mock_load_object(module_path, class_name):
mocks = {
"JobDB": MagicMock(),
"TaskQueueDB": MagicMock(),
"PilotAgentsDB": MagicMock(),
"SandboxMetadataDB": MagicMock(),
"StorageManagementDB": MagicMock(),
}
return {"OK": True, "Value": lambda: mocks[class_name]}

mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
side_effect=mock_load_object,
)
jobCleaningAgent = JobCleaningAgent()

jobCleaningAgent.log = gLogger
jobCleaningAgent.log.setLevel("DEBUG")
jobCleaningAgent._AgentModule__configDefaults = mockAM
Expand Down
15 changes: 9 additions & 6 deletions src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition

Expand Down Expand Up @@ -96,11 +96,6 @@ def kill_delete_jobs(
if not result["OK"]:
return result
pilotagentsdb = result["Value"]()
if storagemanagementdb is None:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
storagemanagementdb = result["Value"]()

badIDs = []

Expand Down Expand Up @@ -133,6 +128,14 @@ def kill_delete_jobs(
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]

if stagingJobList:
if storagemanagementdb is None:
result = ObjectLoader().loadObject(
"StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB"
)
if not result["OK"]:
return result
storagemanagementdb = result["Value"]()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does happen here if an exception is raised?

Copy link
Copy Markdown
Contributor Author

@fstagni fstagni Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a real exception, which I think it makes sense that it is propagated: if that happens it means that:

  • there are jobs in Staging status (which can only be set by the StorageManagementSystem)
  • The StorageManagementDB is removed.

I do not see how this could ever happen, unless someone installs the StorageManagementSystem only to remove it later.


gLogger.info("Going to send killing signal to stager as well!")
result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList)
if not result["OK"]:
Expand Down
56 changes: 28 additions & 28 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
""" JobManagerHandler is the implementation of the JobManager service
in the DISET framework
"""JobManagerHandler is the implementation of the JobManager service
in the DISET framework

The following methods are available in the Service interface
The following methods are available in the Service interface

submitJob()
rescheduleJob()
deleteJob()
killJob()
submitJob()
rescheduleJob()
deleteJob()
killJob()

"""

from pydantic import ValidationError

from DIRAC import S_ERROR, S_OK
Expand All @@ -22,6 +23,7 @@
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
Expand All @@ -30,7 +32,6 @@
RIGHT_SUBMIT,
JobPolicy,
)
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
Expand All @@ -44,34 +45,33 @@ class JobManagerHandlerMixin:
@classmethod
def initializeHandler(cls, serviceInfoDict):
"""Initialization of DB objects and OptimizationMind"""
try:
Comment thread
fstagni marked this conversation as resolved.
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
if not result["OK"]:
return result
cls.jobDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
if not result["OK"]:
return result
cls.jobDB = result["Value"](parentLogger=cls.log)

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
if not result["OK"]:
return result
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
if not result["OK"]:
return result
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
if not result["OK"]:
return result
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
if not result["OK"]:
return result
cls.taskQueueDB = result["Value"](parentLogger=cls.log)

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
if not result["OK"]:
return result
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
if not result["OK"]:
return result
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)

try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
cls.storageManagementDB = result["Value"](parentLogger=cls.log)

except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp!r}")
except RuntimeError:
cls.storageManagementDB = None
Comment thread
fstagni marked this conversation as resolved.

cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind")
result = cls.msgClient.connect(JobManager=True)
Expand Down
Loading