New Queue System

Problem and Context

Qmulum has been running off a file based queue system. It was done when I was still trying to keep everything file based, before I decided that Sqlite would provide major benefits and was worth the complexity.

The existing queue system is a bit too specific for image based transformations. It’s geared more towards transforming a single file (e.g. to rotate an image or generate a thumbnail). Trying to add a queue to process a multipart file, it feels like I’m trying to shoehorn the requirement into a solution that wasn’t designed to handle that type of process.

Design goals

Options and Thoughts

I have been thinking if it’s better to build a scripting language of sorts, or to keep this functionality in Go. Scripting has the benefit of being simple and fully text based and independent of code deployment. But it does involve more work upfront.

How likely are we to need user customisable queues? It’s hard to say.

Is there a combination of the two. A Go solution that allows user stringing together, or chaining of operations down the track?

I also need to keep things simple.

Exploring a non script-based solution.

How do we handle contention? In theory, we could run multiple queue tasks at the same time, making use of the processor’s multiple cores. In order to do this, there are certain shared resources that we need to manage the contention of:

Operations:

Tasks

QueueProcessor

TaskParameter

Task

TaskChain

Steps

  1. Remove file-specific queue. Queue should manage a generic task, not necessarily based on a single file as is currently the case.
file1.tostream().join(file2,file3).verifyhash().saveToFile()
in = file1.tostream()
in.resize(500).saveToFile()

Something like:

// Task
func rotateAdjustBrightnessAndSave(parameters) {
    inputfile = parameters.get("inputfilekey")
    inputfile.tostream().rotate(parameters.get("degrees")).adjustbrightness().saveToFile(parameters.get("outfile"))
}

func createThumbnails(parameters) {
    stream = inputfile.tostream()
    stream.resize(500).saveToFile(parameters.get("largeoutfile"))
    stream.resize(1800).saveToFile(parameters.get("smalloutfile"))
}

QueueProcessor.AddTask(rotateAdjustBrightnessAndSave,{degrees:90,inputkey:"",outputkey:"/x/y/z.jpg"}
QueueProcessor.AddTask(func(parameters) {
    
})


BUT how do I persist the above definition to the database?
| Column             | Type     | Description                                                                                                                                                                                                                                                                                                                                       | Example value              |
|--------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------|
| task_id            | INT      | Primary key of the task. Auto-increment. It is also used as the sequence in which tasks should be executed                                                                                                                                                                                                                                        | 1                          |
| task_function_name | TEXT     | The name of the function to be executed.                                                                                                                                                                                                                                                                                                          | joinMultipartFiles         |
| task_headline      | TEXT     | A 'headline' that can be used in an Admin screen to see a description of what the task is, without needing to examine the parameters.                                                                                                                                                                                                             | Rotate file xxyy123        |
| time_added         | DATETIME | The date and time that this task was added to the queue.                                                                                                                                                                                                                                                                                          | 2024-08-31 20:28:06        |
| shared_resource    | TEXT     | A value used to prevent contention when concurrently processing tasks from the queue. Tasks with the same shared_resource value will be processed in sequence and not at the same time as other tasks with the same shared_resource value.  If this value is set to 'EXCLUSIVE', the task will not be processed concurrently with any other task. | xxyy213.jpg  or  EXCLUSIVE |
| status             | TEXT     | The run status of this task. One of PENDING, RUNNING, DONE, FAILED.                                                                                                                                                                                                                                                                               | PENDING                    |

TASK_PARAM (user would be a parameter)

task_param_id INT PRIMARY KEY key val task_id

TASK_LOG (logs when a task was started, completed, failed, etc)

task_log_id task_id log_time log_type (STARTED,COMPLETED,FAILED,REQUEUED) message


### FuncLang - A script based option
FuncLang is a very basic language, from the programmer's perspective more so than the user's. 
It has functions. It has values.
It has comments -- only /* these ones */

Example 1 - Applying a transformation to files meeting criteria. Saving a copy.

PARAMS(), FOREACH_DO(

X,

/* Rotate 90 deg and save as filename_rotated */
SAVE_TO_FILE(
    ROTATE_BY(STREAM(X),90), /* What we want to save */
    CONCAT(
        FILEPATH_WITHOUT_EXTENSION(ATTR(X,"key")),
        "_rotated",
        EXTENSION(ATTR(X,"key")))
), 

FILES_WHERE( /* Returns a list of files */
    AND(
        HAS_ALL_LABELS("foo"),
        GREATER_THAN(SIZE(),10)
    )
)

)


Example 2 - Join multipart file parts.

PARAMS() SET_VAR(JOINED_FILE,NEW_FILE()) FOREACH_DO(

X,

DO(
    SAVE_TO_FILE(        
        APPEND_FILES(X,JOINED_FILE),
        "/multi/joined.part"
    ),
    VERIFY_HASH("xxhash333","/multi/joined.part"),
    UPLOAD("/multi/joined.part","/the/key.jpg"),
    DELETE_FILE("/multi/joined.part")
),


LIST("/multi/123.part","/multi/234.part")

)


Example 3 - Create thumbnails for a file.

PARAMS(INPUT_FILE_STREAM,[ ])

”`