New JobQueue worker

This commit is contained in:
Jamie Curnow 2022-07-15 08:52:38 +10:00
parent 3c0af95468
commit f51c12ed9a
10 changed files with 209 additions and 103 deletions

View File

@ -8,10 +8,11 @@ import (
"npm/internal/api"
"npm/internal/config"
"npm/internal/database"
"npm/internal/entity/certificate"
"npm/internal/entity/host"
"npm/internal/entity/setting"
"npm/internal/jobqueue"
"npm/internal/logger"
"npm/internal/state"
"npm/internal/worker"
)
var commit string
@ -21,13 +22,17 @@ var sentryDSN string
func main() {
config.InitArgs(&version, &commit)
config.Init(&version, &commit, &sentryDSN)
appstate := state.NewState()
database.Migrate(func() {
setting.ApplySettings()
database.CheckSetup()
go worker.StartCertificateWorker(appstate)
// Internal Job Queue
jobqueue.Start()
certificate.AddPendingJobs()
host.AddPendingJobs()
// Http server
api.StartServer()
irqchan := make(chan os.Signal, 1)
signal.Notify(irqchan, syscall.SIGINT, syscall.SIGTERM)
@ -40,6 +45,8 @@ func main() {
if err != nil {
logger.Error("DatabaseCloseError", err)
}
// nolint
jobqueue.Shutdown()
break
}
}

View File

@ -180,11 +180,11 @@ CREATE TABLE IF NOT EXISTS `host`
user_id INTEGER NOT NULL,
type TEXT NOT NULL,
host_template_id INTEGER NOT NULL,
listen_interface TEXT NOT NULL,
listen_interface TEXT NOT NULL DEFAULT "",
domain_names TEXT NOT NULL,
upstream_id INTEGER NOT NULL,
certificate_id INTEGER,
access_list_id INTEGER,
upstream_id INTEGER NOT NULL DEFAULT 0,
certificate_id INTEGER NOT NULL DEFAULT 0,
access_list_id INTEGER NOT NULL DEFAULT 0,
ssl_forced INTEGER NOT NULL DEFAULT 0,
caching_enabled INTEGER NOT NULL DEFAULT 0,
block_exploits INTEGER NOT NULL DEFAULT 0,
@ -192,9 +192,11 @@ CREATE TABLE IF NOT EXISTS `host`
http2_support INTEGER NOT NULL DEFAULT 0,
hsts_enabled INTEGER NOT NULL DEFAULT 0,
hsts_subdomains INTEGER NOT NULL DEFAULT 0,
paths TEXT NOT NULL,
paths TEXT NOT NULL DEFAULT "",
upstream_options TEXT NOT NULL DEFAULT "",
advanced_config TEXT NOT NULL DEFAULT "",
status TEXT NOT NULL DEFAULT "",
error_message TEXT NOT NULL DEFAULT "",
is_disabled INTEGER NOT NULL DEFAULT 0,
is_deleted INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES user (id),

View File

@ -8,6 +8,7 @@ import (
"npm/internal/database"
"npm/internal/entity"
"npm/internal/errors"
"npm/internal/jobqueue"
"npm/internal/logger"
"npm/internal/model"
)
@ -172,3 +173,25 @@ func GetByStatus(status string) ([]Model, error) {
return models, err
}
// AddPendingJobs is intended to be used at startup to add
// anything pending to the JobQueue just once, based on
// the database row status
func AddPendingJobs() {
rows, err := GetByStatus(StatusReady)
if err != nil {
logger.Error("AddPendingJobsError", err)
return
}
for _, row := range rows {
logger.Debug("Adding RequestCertificate job: %+v", row)
err := jobqueue.AddJob(jobqueue.Job{
Name: "RequestCertificate",
Action: row.Request,
})
if err != nil {
logger.Error("AddPendingJobsError", err)
}
}
}

View File

@ -50,6 +50,8 @@ func create(host *Model) (int, error) {
paths,
upstream_options,
advanced_config,
status,
error_message,
is_disabled,
is_deleted
) VALUES (
@ -73,6 +75,8 @@ func create(host *Model) (int, error) {
:paths,
:upstream_options,
:advanced_config,
:status,
:error_message,
:is_disabled,
:is_deleted
)`, host)
@ -86,6 +90,8 @@ func create(host *Model) (int, error) {
return 0, lastErr
}
logger.Debug("Created Host: %+v", host)
return int(last), nil
}
@ -120,10 +126,14 @@ func update(host *Model) error {
paths = :paths,
upstream_options = :upstream_options,
advanced_config = :advanced_config,
status = :status,
error_message = :error_message,
is_disabled = :is_disabled,
is_deleted = :is_deleted
WHERE id = :id`, host)
logger.Debug("Updated Host: %+v", host)
return err
}
@ -181,3 +191,10 @@ func List(pageInfo model.PageInfo, filters []model.Filter, expand []string) (Lis
return result, nil
}
// AddPendingJobs is intended to be used at startup to add
// anything pending to the JobQueue just once, based on
// the database row status
func AddPendingJobs() {
// todo
}

View File

@ -20,6 +20,12 @@ const (
RedirectionHostType = "redirection"
// DeadHostType is self explanatory
DeadHostType = "dead"
// StatusReady means a host is ready to configure
StatusReady = "ready"
// StatusOK means a host is configured within Nginx
StatusOK = "ok"
// StatusError is self explanatory
StatusError = "error"
)
// Model is the user model
@ -45,6 +51,8 @@ type Model struct {
Paths string `json:"paths" db:"paths" filter:"paths,string"`
UpstreamOptions string `json:"upstream_options" db:"upstream_options" filter:"upstream_options,string"`
AdvancedConfig string `json:"advanced_config" db:"advanced_config" filter:"advanced_config,string"`
Status string `json:"status" db:"status" filter:"status,string"`
ErrorMessage string `json:"error_message" db:"error_message" filter:"error_message,string"`
IsDisabled bool `json:"is_disabled" db:"is_disabled" filter:"is_disabled,boolean"`
IsDeleted bool `json:"is_deleted,omitempty" db:"is_deleted"`
// Expansions
@ -81,6 +89,9 @@ func (m *Model) Save() error {
return fmt.Errorf("User ID must be specified")
}
// Set this host as requiring reconfiguration
m.Status = StatusReady
if m.ID == 0 {
m.ID, err = create(m)
} else {

View File

@ -0,0 +1,46 @@
package jobqueue
import (
"context"
"errors"
)
var (
ctx context.Context
cancel context.CancelFunc
worker *Worker
)
// Start ...
func Start() {
ctx, cancel = context.WithCancel(context.Background())
q := &Queue{
jobs: make(chan Job),
ctx: ctx,
cancel: cancel,
}
// Defines a queue worker, which will execute our queue.
worker = newWorker(q)
// Execute jobs in queue.
go worker.doWork()
}
// AddJob adds a job to the queue for processing
func AddJob(j Job) error {
if worker == nil {
return errors.New("Unable to add job, jobqueue has not been started")
}
worker.Queue.AddJob(j)
return nil
}
// Shutdown ...
func Shutdown() error {
if cancel == nil {
return errors.New("Unable to shutdown, jobqueue has not been started")
}
cancel()
return nil
}

View File

@ -0,0 +1,58 @@
package jobqueue
import (
"context"
"log"
"sync"
)
// Queue holds name, list of jobs and context with cancel.
type Queue struct {
jobs chan Job
ctx context.Context
cancel context.CancelFunc
}
// Job - holds logic to perform some operations during queue execution.
type Job struct {
Name string
Action func() error // A function that should be executed when the job is running.
}
// AddJobs adds jobs to the queue and cancels channel.
func (q *Queue) AddJobs(jobs []Job) {
var wg sync.WaitGroup
wg.Add(len(jobs))
for _, job := range jobs {
// Goroutine which adds job to the queue.
go func(job Job) {
q.AddJob(job)
wg.Done()
}(job)
}
go func() {
wg.Wait()
// Cancel queue channel, when all goroutines were done.
q.cancel()
}()
}
// AddJob sends job to the channel.
func (q *Queue) AddJob(job Job) {
q.jobs <- job
log.Printf("New job %s added to queue", job.Name)
}
// Run performs job execution.
func (j Job) Run() error {
log.Printf("Job running: %s", j.Name)
err := j.Action()
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,36 @@
package jobqueue
import (
"fmt"
"npm/internal/logger"
)
// Worker responsible for queue serving.
type Worker struct {
Queue *Queue
}
func newWorker(queue *Queue) *Worker {
return &Worker{
Queue: queue,
}
}
// doWork processes jobs from the queue (jobs channel).
func (w *Worker) doWork() bool {
for {
select {
// if context was canceled.
case <-w.Queue.ctx.Done():
logger.Info("JobQueue worker graceful shutdown")
return true
// if job received.
case job := <-w.Queue.jobs:
err := job.Run()
if err != nil {
logger.Error(fmt.Sprintf("%sError", job.Name), err)
continue
}
}
}
}

View File

@ -1,31 +0,0 @@
package state
import (
"sync"
)
// AppState holds pointers to channels and waitGroups
// shared by all goroutines of the application
type AppState struct {
waitGroup sync.WaitGroup
termSig chan bool
}
// NewState creates a new app state
func NewState() *AppState {
state := &AppState{
// buffered channel
termSig: make(chan bool, 1),
}
return state
}
// GetWaitGroup returns the state's wg
func (state *AppState) GetWaitGroup() *sync.WaitGroup {
return &state.waitGroup
}
// GetTermSig returns the state's term signal
func (state *AppState) GetTermSig() chan bool {
return state.termSig
}

View File

@ -1,63 +0,0 @@
package worker
import (
"time"
"npm/internal/entity/certificate"
"npm/internal/logger"
"npm/internal/state"
)
type certificateWorker struct {
state *state.AppState
}
// StartCertificateWorker starts the CertificateWorker
func StartCertificateWorker(state *state.AppState) {
worker := newCertificateWorker(state)
logger.Info("CertificateWorker Started")
worker.Run()
}
func newCertificateWorker(state *state.AppState) *certificateWorker {
return &certificateWorker{
state: state,
}
}
// Run the CertificateWorker
func (w *certificateWorker) Run() {
// global wait group
gwg := w.state.GetWaitGroup()
gwg.Add(1)
ticker := time.NewTicker(15 * time.Second)
mainLoop:
for {
select {
case _, more := <-w.state.GetTermSig():
if !more {
logger.Info("Terminating CertificateWorker ... ")
break mainLoop
}
case <-ticker.C:
// Can confirm that this will wait for completion before the next loop
requestCertificates()
}
}
}
func requestCertificates() {
// logger.Debug("requestCertificates fired")
rows, err := certificate.GetByStatus(certificate.StatusReady)
if err != nil {
logger.Error("requestCertificatesError", err)
return
}
for _, row := range rows {
if err := row.Request(); err != nil {
logger.Error("CertificateRequestError", err)
}
}
}