Skip to content

Commit

Permalink
Merge pull request #153 from NCronJob-Dev/ntk/instant_return
Browse files Browse the repository at this point in the history
Make `IInstantJobRegistry` members return the job correlation id
  • Loading branch information
nulltoken authored Jan 2, 2025
2 parents a46fa6e + 3c6c991 commit 7334b45
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ All notable changes to **NCronJob** will be documented in this file. The project
- Expose typed version of `DisableJob()`. Added in [#151](https://github.com/NCronJob-Dev/NCronJob/issues/151), by [@nulltoken](https://github.com/nulltoken).
- Expose typed version of `EnableJob()`. Added in [#151](https://github.com/NCronJob-Dev/NCronJob/issues/151), by [@nulltoken](https://github.com/nulltoken).

### Changed

- Teach `IInstantJobRegistry` members to return the job correlation id. Changed in [#153](https://github.com/NCronJob-Dev/NCronJob/issues/153), by [@nulltoken](https://github.com/nulltoken).

### Fixed

- Make `RemoveJob<TJob>()` and `RemoveJob(Type)` remove all jobs of the given type. Fixed in [#151](https://github.com/NCronJob-Dev/NCronJob/issues/151), by [@nulltoken](https://github.com/nulltoken).
Expand Down
12 changes: 12 additions & 0 deletions docs/features/instant-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,15 @@ app.MapPost("/send-email", (RequestDto dto, IInstantJobRegistry jobRegistry) =>
return TypedResults.Ok();
});
```

## Instrumentation

All members of the `IInstantJobRegistry` interface return the correlation id of the triggered job (See [*"Tracing requests of dependencies via `CorrelationId`"*](./model-dependencies.md#tracing-requests-of-dependencies-via-correlationid).).

```csharp
Guid oneCorrelationId = jobRegistry.RunInstantJob<MyJob>();

Guid anotherCorrelationId = jobRegistry.RunScheduledJob<MyJob>(TimeSpan.FromMinutes(5));

[...]
```
76 changes: 44 additions & 32 deletions src/NCronJob/Registry/IInstantJobRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public interface IInstantJobRegistry
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// </summary>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// This is a fire-and-forget process, the Job will be queued with high priority and run in the background. The contents of <paramref name="parameter" />
/// are not serialized and deserialized. It is the reference to the <paramref name="parameter"/>-object that gets passed in.
Expand All @@ -24,7 +25,7 @@ public interface IInstantJobRegistry
/// instantJobRegistry.RunInstantJob&lt;MyJob&gt;(new MyParameterObject { Foo = "Bar" });
/// </code>
/// </example>
void RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
Guid RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -36,15 +37,17 @@ void RunInstantJob<TJob>(object? parameter = null, CancellationToken token = def
/// </remarks>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="token">An optional token to cancel the job.</param>
void RunInstantJob(Delegate jobDelegate, CancellationToken token = default);
/// <returns>The job correlation id.</returns>
Guid RunInstantJob(Delegate jobDelegate, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed after the given <paramref name="delay"/>.
/// </summary>
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
/// <returns>The job correlation id.</returns>
Guid RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -53,7 +56,8 @@ void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, Cancellatio
/// <param name="startDate">The starting point when the job will be executed.</param>
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
/// <returns>The job correlation id.</returns>
Guid RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -62,47 +66,51 @@ void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, C
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);
Guid RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed at the given <paramref name="startDate"/>.
/// </summary>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="startDate">The starting point when the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);
Guid RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed after the given <paramref name="delay"/>. The job will not be queued into the JobQueue, but executed directly.
/// </summary>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);
Guid ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed at the given <paramref name="startDate"/>. The job will not be queued into the JobQueue, but executed directly.
/// </summary>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="startDate">The starting point when the job will be executed.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// The <paramref name="jobDelegate"/> delegate supports, like <see cref="NCronJobExtensions.AddNCronJob(IServiceCollection, Delegate, string, TimeZoneInfo)"/>, that services can be retrieved dynamically.
/// Also, the <see cref="CancellationToken"/> can be retrieved in this way.
/// </remarks>
void ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);
Guid ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default);

/// <summary>
/// Runs a job that will be executed after the given <paramref name="delay"/>. The job will not be queued into the JobQueue, but executed directly.
Expand All @@ -111,14 +119,16 @@ void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, C
/// <param name="delay">The delay until the job will be executed.</param>
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
void ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
/// <returns>The job correlation id.</returns>
Guid ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
/// Runs an instant job to the registry, which will be executed even if the job is not registered and the concurrency is exceeded.
/// <param name="parameter">An optional parameter that is passed down as the <see cref="JobExecutionContext"/> to the job.</param>
/// <param name="token">An optional token to cancel the job.</param>
/// </summary>
/// <returns>The job correlation id.</returns>
/// <remarks>
/// This is a fire-and-forget process, the Job will be run immediately in the background. The contents of <paramref name="parameter" />
/// are not serialized and deserialized. It is the reference to the <paramref name="parameter"/>-object that gets passed in.
Expand All @@ -129,7 +139,7 @@ void ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, Cancel
/// instantJobRegistry.RunInstantJob&lt;MyJob&gt;(new MyParameterObject { Foo = "Bar" });
/// </code>
/// </example>
void ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
Guid ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob;

/// <summary>
Expand All @@ -141,8 +151,8 @@ void ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token
/// </remarks>
/// <param name="jobDelegate">The delegate to execute.</param>
/// <param name="token">An optional token to cancel the job.</param>
void ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default);

/// <returns>The job correlation id.</returns>
Guid ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default);
}

internal sealed partial class InstantJobRegistry : IInstantJobRegistry
Expand All @@ -168,71 +178,71 @@ public InstantJobRegistry(
}

/// <inheritdoc />
public void RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
public Guid RunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob => RunScheduledJob<TJob>(TimeSpan.Zero, parameter, token);

/// <inheritdoc />
public void RunInstantJob(Delegate jobDelegate, CancellationToken token = default) => RunScheduledJob(jobDelegate, TimeSpan.Zero, token);
public Guid RunInstantJob(Delegate jobDelegate, CancellationToken token = default) => RunScheduledJob(jobDelegate, TimeSpan.Zero, token);

/// <inheritdoc />
public void RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
public Guid RunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob
{
var utcNow = timeProvider.GetUtcNow();
RunJob<TJob>(utcNow + delay, parameter, false, token);
return RunJob<TJob>(utcNow + delay, parameter, false, token);
}

/// <inheritdoc />
public void RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
public Guid RunScheduledJob<TJob>(DateTimeOffset startDate, object? parameter = null, CancellationToken token = default)
where TJob : IJob =>
RunJob<TJob>(startDate, parameter, false, token);

/// <inheritdoc />
public void RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
public Guid RunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
{
var utcNow = timeProvider.GetUtcNow();
RunScheduledJob(jobDelegate, utcNow + delay, token);
return RunScheduledJob(jobDelegate, utcNow + delay, token);
}

/// <inheritdoc />
public void RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
public Guid RunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
RunDelegateJob(jobDelegate, startDate, false, token);

/// <inheritdoc />
public void ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
public Guid ForceRunScheduledJob<TJob>(TimeSpan delay, object? parameter = null, CancellationToken token = default)
where TJob : IJob
{
var utcNow = timeProvider.GetUtcNow();
RunJob<TJob>(utcNow + delay, parameter, true, token);
return RunJob<TJob>(utcNow + delay, parameter, true, token);
}

/// <inheritdoc />
public void ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
public Guid ForceRunScheduledJob(Delegate jobDelegate, TimeSpan delay, CancellationToken token = default)
{
var utcNow = timeProvider.GetUtcNow();
ForceRunScheduledJob(jobDelegate, utcNow + delay, token);
return ForceRunScheduledJob(jobDelegate, utcNow + delay, token);
}

/// <inheritdoc />
public void ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
public Guid ForceRunScheduledJob(Delegate jobDelegate, DateTimeOffset startDate, CancellationToken token = default) =>
RunDelegateJob(jobDelegate, startDate, true, token);

/// <inheritdoc />
public void ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default) =>
public Guid ForceRunInstantJob(Delegate jobDelegate, CancellationToken token = default) =>
ForceRunScheduledJob(jobDelegate, TimeSpan.Zero, token);

/// <inheritdoc />
public void ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
public Guid ForceRunInstantJob<TJob>(object? parameter = null, CancellationToken token = default)
where TJob : IJob => ForceRunScheduledJob<TJob>(TimeSpan.Zero, parameter, token);

private void RunDelegateJob(Delegate jobDelegate, DateTimeOffset startDate, bool forceExecution = false, CancellationToken token = default)
private Guid RunDelegateJob(Delegate jobDelegate, DateTimeOffset startDate, bool forceExecution = false, CancellationToken token = default)
{
var definition = jobRegistry.AddDynamicJob(jobDelegate);

RunInternal(definition, null, startDate, forceExecution, token);
return RunInternal(definition, null, startDate, forceExecution, token);
}

private void RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bool forceExecution = false, CancellationToken token = default)
private Guid RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bool forceExecution = false, CancellationToken token = default)
where TJob : IJob
{
using (logger.BeginScope("Triggering RunScheduledJob:"))
Expand All @@ -247,11 +257,11 @@ private void RunJob<TJob>(DateTimeOffset startDate, object? parameter = null, bo

token.Register(() => LogCancellationRequested(parameter));

RunInternal(jobDefinition, parameter, startDate, forceExecution, token);
return RunInternal(jobDefinition, parameter, startDate, forceExecution, token);
}
}

private void RunInternal(
private Guid RunInternal(
JobDefinition jobDefinition,
object? parameter,
DateTimeOffset startDate,
Expand All @@ -273,6 +283,8 @@ private void RunInternal(
jobQueue.EnqueueForDirectExecution(run, startDate);
jobQueueManager.SignalJobQueue(run.JobDefinition.JobFullName);
}

return run.CorrelationId;
}

[LoggerMessage(LogLevel.Warning, "Job {JobName} cancelled by request.")]
Expand Down
3 changes: 2 additions & 1 deletion tests/NCronJob.Tests/RunDependentJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ public async Task CorrelationIdIsSharedByJobsAndTheirDependencies()
var provider = CreateServiceProvider();
await provider.GetRequiredService<IHostedService>().StartAsync(CancellationToken);

provider.GetRequiredService<IInstantJobRegistry>().ForceRunInstantJob<PrincipalCorrelationIdJob>(token: CancellationToken);
var correlationId = provider.GetRequiredService<IInstantJobRegistry>().ForceRunInstantJob<PrincipalCorrelationIdJob>(token: CancellationToken);

await CommunicationChannel.Reader.ReadAsync(CancellationToken);
var storage = provider.GetRequiredService<Storage>();
storage.Guids.Count.ShouldBe(2);
storage.Guids.Distinct().Count().ShouldBe(1);
storage.Guids.First().ShouldBe(correlationId);
}

[Fact]
Expand Down

0 comments on commit 7334b45

Please sign in to comment.