claude-meta/memory/projects/kbsV3/job-system.md

4.3 KiB
Raw Permalink Blame History

KbsV3 Job System How It Works

Job Lifecycle (end-to-end)

  1. Job created via UI (/adsyncjob), REST controller, or Kbs3SchedulerService
    • Sets JobDto.JobType, InputData (JSON string), JobSubtype, CreatedBy
    • Persisted to DB via IJobDataService.AddJob()
  2. Job picked up background poller in Kbs3ServerService finds pending jobs
  3. JobStartService.StartJob(job) resolves JobRequirementFactoryService, checks CanStart()
  4. gRPC call server calls microservice via IJobSubscribeService.Subscribe(JobDataRequest)
    • JobDataRequest.InputData = the JSON string from step 1
  5. Worker execution microservice's ILongRunningJobService.Run(request, ct) or IKbsJob host
  6. Streaming protocol updates stream back via gRPC IAsyncEnumerable<JobProtocolUpdate>
  7. Persistence server saves Protocol/ProtocolLine entities

Creating a Job Programmatically

From UI page (direct DB)

job.JobType = JobType.DeleteInactiveUsers;
job.InputData = "{\"DryRun\":true}";  // raw JSON string
job.JobSubtype = JobSubtypes.SyncZadGroup;
await JobService.AddJob(job);  // IJobDataService

From microservice / scheduler (via gRPC IApiComposer)

await apiComposer.CreateKnownJob(new CreateJobRequest
{
    Jobtype = JobType.DeleteInactiveUsers.GetStringValue(),  // "DeleteInactiveUsers"
    PayLoad = "{\"DryRun\":false}",  // string → JobDto.InputData as-is
    Priority = 5
});

PayLoad (string) is stored directly as JobDto.InputData in CreateAndEnqueueJob (no extra serialization). No IKnownJob definition required for this approach.

Multi-Service Jobs (e.g. DeleteInactiveUsers)

  • Multiple microservices (AD, Azure, DW, Helpdesk, SQL) each register a worker for the same JobType
  • All fire automatically when one job with that JobType is created server broadcasts to all registered services
  • No special routing needed; just one CreateKnownJob call triggers all

ILongRunningJobService Workers

  • Dedicated JobType enum entry (e.g. JobType.DeleteInactiveUsers)
  • Implement Run(JobDataRequest request, CancellationToken ct)
  • Read input: JsonConvert.DeserializeObject<T>(request.InputData)
  • Auto-registered as keyed service by JobType string value in IntegrationServiceBase.AutoInjectServices()
  • Log via protocolLogger.LogHeadline / LogInfo / LogWarn / LogError

IKbsJob Workers (Kbs3SqlService pattern)

  • All share JobType.CallServiceHost
  • Auto-registered by class name (e.g. "SyncOpexPurchaseOrdersService")
  • IKnownJob.ServiceName = "modulename.ClassName" → host resolves last segment to find worker
  • Do NOT add new JobType enum entries for these

Kbs3SchedulerService

Adding a new scheduled job

  1. Create MyJob : BaseJob in Kbs3SchedulerService/Jobs/
    • Constructor: (ILogger<MyJob> logger, IApiComposer apiComposer) : base(logger)
    • Implement override string Name and override async Task Execute(CancellationToken, KeyValue[] settings)
    • Auto-registered via AddAllTypesOf<IJob>() no manual DI
  2. Add entry to appsettings.Development.json (IsEnabled: false) and appsettings.Production.json (IsEnabled: true) under SchedulerSettings.Items

Schedule patterns

Type Meaning Key fields
1 Daily Interval, StartTime
2 Weekly Interval, DaysOfWeek: ["Monday","Friday",...]
3 Monthly DaysOfMonth
5 Interval CustomInterval: "HH:mm:ss"
  • StartTime on the root item = time of day for execution
  • Settings array → read in Execute via settings.FirstOrDefault(s => s.Key == "Foo")?.Value

Internal flow

SchedulerHostedServiceSeriesSchedulerManagerSeriesJobScheduler (infinite loop) → _schedule.GetNextExecution(DateTime.Now)Task.Delay(...)job.ExecuteAsync(...)

InputData / Payload flow

CreateJobRequest.PayLoad (string)
  → ApiComposerService.CreateKnownJob()
  → CreateJobService.CreateAndEnqueueJob()
  → if (payload is string) input = payload as string   ← no extra serialization!
  → JobDto.InputData
  → JobDataRequest.InputData (sent to microservice via gRPC)
  → worker reads request.InputData