Overview
The Workflows Service manages the complete lifecycle of workflows including creation, updates, termination, and deletion.
Service Details:
- Package:
workflows
- Port:
50052
- Proto:
proto/workflows/workflows.proto
Service Definition
service WorkflowsService {
rpc CreateWorkflow(CreateWorkflowRequest) returns (CreateWorkflowResponse) {}
rpc UpdateWorkflow(UpdateWorkflowRequest) returns (UpdateWorkflowResponse) {}
rpc UpdateWorkflowBuildStatus(UpdateWorkflowBuildStatusRequest) returns(UpdateWorkflowBuildStatusResponse) {}
rpc GetWorkflow(GetWorkflowRequest) returns (GetWorkflowResponse) {}
rpc GetWorkflowByID(GetWorkflowByIDRequest) returns (GetWorkflowByIDResponse){}
rpc IncrementWorkflowConsecutiveJobFailuresCount(IncrementWorkflowConsecutiveJobFailuresCountRequest) returns (IncrementWorkflowConsecutiveJobFailuresCountResponse) {}
rpc ResetWorkflowConsecutiveJobFailuresCount(ResetWorkflowConsecutiveJobFailuresCountRequest) returns (ResetWorkflowConsecutiveJobFailuresCountResponse) {}
rpc TerminateWorkflow(TerminateWorkflowRequest) returns(TerminateWorkflowResponse) {}
rpc DeleteWorkflow(DeleteWorkflowRequest) returns (DeleteWorkflowResponse) {}
rpc ListWorkflows(ListWorkflowsRequest) returns (ListWorkflowsResponse) {}
}
RPC Methods
CreateWorkflow
Create a new workflow.
Request:
ID of the user creating the workflow
JSON string containing workflow configuration
Type of workflow (e.g., “http”, “script”, “docker”)
Execution interval in minutes
max_consecutive_job_failures_allowed
Maximum consecutive failures before workflow termination
Response:
Unique identifier of the created workflow
Example:
req := &workflows.CreateWorkflowRequest{
UserId: "user-123",
Name: "Daily Health Check",
Payload: `{"url": "https://api.example.com/health"}`,
Kind: "http",
Interval: 60,
MaxConsecutiveJobFailuresAllowed: 3,
}
resp, err := client.CreateWorkflow(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Workflow created: %s\n", resp.Id)
UpdateWorkflow
Update an existing workflow’s configuration.
Request:
User ID (for authorization)
Updated JSON configuration
Updated interval in minutes
max_consecutive_job_failures_allowed
Updated failure threshold
Response:
Empty response on success.
Example:
req := &workflows.UpdateWorkflowRequest{
Id: "workflow-456",
UserId: "user-123",
Name: "Hourly Health Check",
Payload: `{"url": "https://api.example.com/health", "timeout": 30}`,
Interval: 60,
MaxConsecutiveJobFailuresAllowed: 5,
}
_, err := client.UpdateWorkflow(ctx, req)
if err != nil {
log.Fatal(err)
}
UpdateWorkflowBuildStatus
Internal API - Not exposed to public. Used for inter-service communication.
Update the build status of a workflow.
Request:
Build status (e.g., “pending”, “building”, “success”, “failed”)
Response:
Empty response on success.
GetWorkflow
Retrieve workflow details by ID and user ID.
Request:
User ID (for authorization)
Response:
Execution interval in minutes
consecutive_job_failures_count
Current consecutive failure count
max_consecutive_job_failures_allowed
Maximum allowed consecutive failures
Termination timestamp (if terminated)
Example:
req := &workflows.GetWorkflowRequest{
Id: "workflow-456",
UserId: "user-123",
}
resp, err := client.GetWorkflow(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Workflow: %s (Status: %s)\n", resp.Name, resp.BuildStatus)
GetWorkflowByID
Internal API - Not exposed to public. Used for inter-service communication.
Retrieve workflow by ID only (no user_id required).
Request:
Response:
Same as GetWorkflow response, plus user_id field.
IncrementWorkflowConsecutiveJobFailuresCount
Internal API - Not exposed to public. Called when jobs fail.
Increment the consecutive job failures counter.
Request:
Response:
True if max failures threshold was reached
ResetWorkflowConsecutiveJobFailuresCount
Internal API - Not exposed to public. Called when jobs succeed.
Reset the consecutive job failures counter to zero.
Request:
Response:
Empty response on success.
TerminateWorkflow
Terminate a workflow (stop scheduling new jobs).
Request:
Response:
Empty response on success.
Example:
req := &workflows.TerminateWorkflowRequest{
Id: "workflow-456",
UserId: "user-123",
}
_, err := client.TerminateWorkflow(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Println("Workflow terminated")
DeleteWorkflow
Permanently delete a workflow.
Request:
Response:
Empty response on success.
Example:
req := &workflows.DeleteWorkflowRequest{
Id: "workflow-456",
UserId: "user-123",
}
_, err := client.DeleteWorkflow(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Println("Workflow deleted")
ListWorkflows
List all workflows for a user with optional filters.
Request:
Pagination cursor (empty for first page)
ListWorkflowsFilters:
Filter terminated workflows
Response:
workflows
WorkflowsByUserIDResponse[]
Array of workflow objects
Cursor for next page (empty if no more pages)
Example:
req := &workflows.ListWorkflowsRequest{
UserId: "user-123",
Cursor: "",
Filters: &workflows.ListWorkflowsFilters{
Kind: "http",
BuildStatus: "success",
},
}
resp, err := client.ListWorkflows(ctx, req)
if err != nil {
log.Fatal(err)
}
for _, wf := range resp.Workflows {
fmt.Printf("Workflow: %s (ID: %s)\n", wf.Name, wf.Id)
}
Connection Example
package main
import (
"context"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/hitesh22rana/chronoverse/pkg/proto/go/workflows"
)
func main() {
conn, err := grpc.Dial(
"localhost:50052",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewWorkflowsServiceClient(conn)
ctx := context.Background()
// Create workflow
resp, err := client.CreateWorkflow(ctx, &pb.CreateWorkflowRequest{
UserId: "user-123",
Name: "API Monitor",
Payload: `{"url": "https://api.example.com"}`,
Kind: "http",
Interval: 30,
MaxConsecutiveJobFailuresAllowed: 3,
})
if err != nil {
log.Fatalf("CreateWorkflow failed: %v", err)
}
log.Printf("Created workflow: %s", resp.Id)
}