Overview
The Jobs Service handles job scheduling, status tracking, and log management for workflow executions.
Service Details:
- Package:
jobs
- Port:
50053
- Proto:
proto/jobs/jobs.proto
Service Definition
service JobsService {
rpc ScheduleJob(ScheduleJobRequest) returns (ScheduleJobResponse) {}
rpc UpdateJobStatus(UpdateJobStatusRequest) returns (UpdateJobStatusResponse) {}
rpc GetJob(GetJobRequest) returns (GetJobResponse) {}
rpc GetJobByID(GetJobByIDRequest) returns (GetJobByIDResponse) {}
rpc GetJobLogs(GetJobLogsRequest) returns (GetJobLogsResponse) {}
rpc StreamJobLogs(StreamJobLogsRequest) returns (stream Log) {}
rpc SearchJobLogs(SearchJobLogsRequest) returns (GetJobLogsResponse) {}
rpc ListJobs(ListJobsRequest) returns (ListJobsResponse) {}
}
Enums
LogStream
enum LogStream {
LOG_STREAM_UNSPECIFIED = 0;
LOG_STREAM_STDOUT = 1;
LOG_STREAM_STDERR = 2;
LOG_STREAM_ALL = 3;
}
RPC Methods
ScheduleJob
Schedule a new job for execution.
Request:
ID of the workflow to execute
Timestamp when the job should run (RFC3339 format)
Trigger type: “AUTOMATIC” or “MANUAL”
Response:
Unique identifier of the scheduled job
Example:
req := &jobs.ScheduleJobRequest{
WorkflowId: "workflow-456",
UserId: "user-123",
ScheduledAt: time.Now().Add(5 * time.Minute).Format(time.RFC3339),
Trigger: "AUTOMATIC",
}
resp, err := client.ScheduleJob(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Job scheduled: %s\n", resp.Id)
UpdateJobStatus
Internal API - Not exposed to public. Used by job executors.
Update the status of a running job.
Request:
Container ID (if applicable)
Job status (e.g., “pending”, “running”, “completed”, “failed”)
Response:
Empty response on success.
GetJob
Retrieve job details.
Request:
User ID (for authorization)
Response:
Trigger type (AUTOMATIC or MANUAL)
Example:
req := &jobs.GetJobRequest{
Id: "job-789",
WorkflowId: "workflow-456",
UserId: "user-123",
}
resp, err := client.GetJob(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Job Status: %s\n", resp.Status)
GetJobByID
Internal API - Not exposed to public. Used for inter-service communication.
Retrieve job by ID only (no user_id required).
Request:
Response:
Same as GetJob response, plus user_id and container_id fields.
GetJobLogs
Retrieve paginated job logs.
Request:
GetJobLogsFilters:
Filter by log stream (stdout, stderr, or all)
Response:
Log Message:
Sequence number for ordering
Stream type (stdout or stderr)
Example:
req := &jobs.GetJobLogsRequest{
Id: "job-789",
WorkflowId: "workflow-456",
UserId: "user-123",
Cursor: "",
Filters: &jobs.GetJobLogsFilters{
Stream: jobs.LogStream_LOG_STREAM_ALL,
},
}
resp, err := client.GetJobLogs(ctx, req)
if err != nil {
log.Fatal(err)
}
for _, log := range resp.Logs {
fmt.Printf("[%s] %s: %s\n", log.Timestamp, log.Stream, log.Message)
}
StreamJobLogs
Stream job logs in real-time (server streaming).
Request:
Response:
Stream of Log messages.
Example:
req := &jobs.StreamJobLogsRequest{
Id: "job-789",
WorkflowId: "workflow-456",
UserId: "user-123",
}
stream, err := client.StreamJobLogs(ctx, req)
if err != nil {
log.Fatal(err)
}
for {
log, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("[%s] %s\n", log.Stream, log.Message)
}
SearchJobLogs
Search and filter job logs.
Request:
filters
SearchJobLogsFilters
required
Search filters
SearchJobLogsFilters:
Search query for log messages
Response:
Same as GetJobLogs response.
Example:
req := &jobs.SearchJobLogsRequest{
Id: "job-789",
WorkflowId: "workflow-456",
UserId: "user-123",
Filters: &jobs.SearchJobLogsFilters{
Stream: jobs.LogStream_LOG_STREAM_STDERR,
Message: "error",
},
}
resp, err := client.SearchJobLogs(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Found %d error logs\n", len(resp.Logs))
ListJobs
List all jobs for a workflow with optional filters.
Request:
ListJobsFilters:
Response:
Example:
req := &jobs.ListJobsRequest{
WorkflowId: "workflow-456",
UserId: "user-123",
Cursor: "",
Filters: &jobs.ListJobsFilters{
Status: "failed",
Trigger: "AUTOMATIC",
},
}
resp, err := client.ListJobs(ctx, req)
if err != nil {
log.Fatal(err)
}
for _, job := range resp.Jobs {
fmt.Printf("Job %s: %s\n", job.Id, job.Status)
}
Connection Example
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/hitesh22rana/chronoverse/pkg/proto/go/jobs"
)
func main() {
conn, err := grpc.Dial(
"localhost:50053",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewJobsServiceClient(conn)
ctx := context.Background()
// Schedule a job
resp, err := client.ScheduleJob(ctx, &pb.ScheduleJobRequest{
WorkflowId: "workflow-456",
UserId: "user-123",
ScheduledAt: time.Now().Add(1 * time.Hour).Format(time.RFC3339),
Trigger: "MANUAL",
})
if err != nil {
log.Fatalf("ScheduleJob failed: %v", err)
}
log.Printf("Job scheduled: %s", resp.Id)
}