Qmulum has been running off a file based queue system. It was done when I was still trying to keep everything file based, before I decided that Sqlite would provide major benefits and was worth the complexity.
The existing queue system is a bit too specific for image based transformations. It’s geared more towards transforming a single file (e.g. to rotate an image or generate a thumbnail). Trying to add a queue to process a multipart file, it feels like I’m trying to shoehorn the requirement into a solution that wasn’t designed to handle that type of process.
I have been thinking if it’s better to build a scripting language of sorts, or to keep this functionality in Go. Scripting has the benefit of being simple and fully text based and independent of code deployment. But it does involve more work upfront.
How likely are we to need user customisable queues? It’s hard to say.
Is there a combination of the two. A Go solution that allows user stringing together, or chaining of operations down the track?
I also need to keep things simple.
How do we handle contention? In theory, we could run multiple queue tasks at the same time, making use of the processor’s multiple cores. In order to do this, there are certain shared resources that we need to manage the contention of:
Files – we don’t want to have two tasks operating on the same file.
The queue database – this could especially be a problem when using the system database. I think we might need to implement something avoiding contention, OR introduce Postgres. But that does add environmental complexity. Probably too much, too early. Given the small nature of these transactions, we could try and see if we get locks, and try to deal with that when/if it happens.
A generic processor to log tasks (e.g. QueueProcessor.AddTask(task))
A generic bundle of parameters that can be added to a task
A task interface
Operations:
Tasks
QueueProcessor
TaskParameter
Task
TaskChain
file1.tostream().join(file2,file3).verifyhash().saveToFile()
in = file1.tostream()
in.resize(500).saveToFile()
Something like:
// Task
func rotateAdjustBrightnessAndSave(parameters) {
inputfile = parameters.get("inputfilekey")
inputfile.tostream().rotate(parameters.get("degrees")).adjustbrightness().saveToFile(parameters.get("outfile"))
}
func createThumbnails(parameters) {
stream = inputfile.tostream()
stream.resize(500).saveToFile(parameters.get("largeoutfile"))
stream.resize(1800).saveToFile(parameters.get("smalloutfile"))
}
QueueProcessor.AddTask(rotateAdjustBrightnessAndSave,{degrees:90,inputkey:"",outputkey:"/x/y/z.jpg"}
QueueProcessor.AddTask(func(parameters) {
})
BUT how do I persist the above definition to the database?
| Column | Type | Description | Example value |
|--------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------|
| task_id | INT | Primary key of the task. Auto-increment. It is also used as the sequence in which tasks should be executed | 1 |
| task_function_name | TEXT | The name of the function to be executed. | joinMultipartFiles |
| task_headline | TEXT | A 'headline' that can be used in an Admin screen to see a description of what the task is, without needing to examine the parameters. | Rotate file xxyy123 |
| time_added | DATETIME | The date and time that this task was added to the queue. | 2024-08-31 20:28:06 |
| shared_resource | TEXT | A value used to prevent contention when concurrently processing tasks from the queue. Tasks with the same shared_resource value will be processed in sequence and not at the same time as other tasks with the same shared_resource value. If this value is set to 'EXCLUSIVE', the task will not be processed concurrently with any other task. | xxyy213.jpg or EXCLUSIVE |
| status | TEXT | The run status of this task. One of PENDING, RUNNING, DONE, FAILED. | PENDING |
task_param_id INT PRIMARY KEY key val task_id
task_log_id task_id log_time log_type (STARTED,COMPLETED,FAILED,REQUEUED) message
### FuncLang - A script based option
FuncLang is a very basic language, from the programmer's perspective more so than the user's.
It has functions. It has values.
It has comments -- only /* these ones */
Example 1 - Applying a transformation to files meeting criteria. Saving a copy.
PARAMS(), FOREACH_DO(
X,
/* Rotate 90 deg and save as filename_rotated */
SAVE_TO_FILE(
ROTATE_BY(STREAM(X),90), /* What we want to save */
CONCAT(
FILEPATH_WITHOUT_EXTENSION(ATTR(X,"key")),
"_rotated",
EXTENSION(ATTR(X,"key")))
),
FILES_WHERE( /* Returns a list of files */
AND(
HAS_ALL_LABELS("foo"),
GREATER_THAN(SIZE(),10)
)
)
)
Example 2 - Join multipart file parts.
PARAMS() SET_VAR(JOINED_FILE,NEW_FILE()) FOREACH_DO(
X,
DO(
SAVE_TO_FILE(
APPEND_FILES(X,JOINED_FILE),
"/multi/joined.part"
),
VERIFY_HASH("xxhash333","/multi/joined.part"),
UPLOAD("/multi/joined.part","/the/key.jpg"),
DELETE_FILE("/multi/joined.part")
),
LIST("/multi/123.part","/multi/234.part")
)
Example 3 - Create thumbnails for a file.
PARAMS(INPUT_FILE_STREAM,[ ])
”`