yaze 0.3.2
Link to the Past ROM Editor
 
Loading...
Searching...
No Matches
wasm_worker_pool.h
Go to the documentation of this file.
1// clang-format off
2#ifndef YAZE_APP_PLATFORM_WASM_WORKER_POOL_H
3#define YAZE_APP_PLATFORM_WASM_WORKER_POOL_H
4
5#include <cstddef>
6#include <cstdint>
7#include <functional>
8#include <string>
9#include <vector>
10
11#ifdef __EMSCRIPTEN__
12#include <atomic>
13#include <condition_variable>
14#include <memory>
15#include <mutex>
16#include <queue>
17#include <thread>
18#include <unordered_map>
19#include <emscripten.h>
20#include <emscripten/threading.h>
21#endif
22
23namespace yaze {
24namespace app {
25namespace platform {
26namespace wasm {
27
44 public:
45 // Task types that can be processed in background
53
54 // Task priority levels
55 enum class Priority {
56 kLow = 0,
57 kNormal = 1,
58 kHigh = 2,
59 kCritical = 3
60 };
61
62 // Callback type for task completion
63 using TaskCallback = std::function<void(bool success, const std::vector<uint8_t>& result)>;
64
65 // Progress callback for long-running tasks
66 using ProgressCallback = std::function<void(float progress, const std::string& message)>;
67
68 // Task structure
69 struct Task {
70 uint32_t id;
73 std::vector<uint8_t> input_data;
76 std::string type_string; // For custom task types
77 bool cancelled = false;
78 };
79
80 // Worker statistics
81 struct WorkerStats {
82 uint32_t tasks_completed = 0;
83 uint32_t tasks_failed = 0;
85 std::string current_task_type;
86 bool is_busy = false;
87 };
88
89 // Special task ID returned when task is executed synchronously (no workers available)
90 static constexpr uint32_t kSynchronousTaskId = UINT32_MAX;
91
92 WasmWorkerPool(size_t num_workers = 0); // 0 = auto-detect optimal count
94
95 // Initialize the worker pool
96 bool Initialize();
97
98 // Shutdown the worker pool
99 void Shutdown();
100
110 uint32_t SubmitTask(TaskType type,
111 const std::vector<uint8_t>& input_data,
112 TaskCallback callback,
113 Priority priority = Priority::kNormal);
114
124 uint32_t SubmitCustomTask(const std::string& type_string,
125 const std::vector<uint8_t>& input_data,
126 TaskCallback callback,
127 Priority priority = Priority::kNormal);
128
132 uint32_t SubmitTaskWithProgress(TaskType type,
133 const std::vector<uint8_t>& input_data,
134 TaskCallback completion_callback,
135 ProgressCallback progress_callback,
136 Priority priority = Priority::kNormal);
137
144 bool Cancel(uint32_t task_id);
145
149 void CancelAllOfType(TaskType type);
150
157 bool WaitAll(uint32_t timeout_ms = 0);
158
162 size_t GetPendingCount() const;
163
167 size_t GetActiveWorkerCount() const;
168
172 std::vector<WorkerStats> GetWorkerStats() const;
173
177 bool IsInitialized() const { return initialized_; }
178
182 void SetMaxWorkers(size_t count);
183
188 void ProcessCallbacks();
189
190 private:
191 // Worker thread function
192 void WorkerThread(size_t worker_id);
193
194 // Process a single task
195 void ProcessTask(const Task& task, size_t worker_id);
196
197 // Execute task based on type
198 std::vector<uint8_t> ExecuteTask(const Task& task);
199
200 // Task-specific processing functions
201 std::vector<uint8_t> ProcessRomDecompression(const std::vector<uint8_t>& input);
202 std::vector<uint8_t> ProcessGraphicsDecoding(const std::vector<uint8_t>& input);
203 std::vector<uint8_t> ProcessPaletteCalculation(const std::vector<uint8_t>& input);
204 std::vector<uint8_t> ProcessAsarCompilation(const std::vector<uint8_t>& input);
205
206 // Report progress from worker thread
207 void ReportProgress(uint32_t task_id, float progress, const std::string& message);
208
209 // Queue a callback for execution on main thread
210 void QueueCallback(std::function<void()> callback);
211
212#ifdef __EMSCRIPTEN__
213 // Emscripten-specific callback handler
214 static void MainThreadCallbackHandler(void* arg);
215#endif
216
217 // Member variables
218 bool initialized_ = false;
219 bool shutting_down_ = false;
221
222#ifdef __EMSCRIPTEN__
223 std::atomic<uint32_t> next_task_id_{1};
224
225 // Worker threads
226 std::vector<std::thread> workers_;
227 std::vector<WorkerStats> worker_stats_;
228
229 // Task queue (priority queue)
230 struct TaskCompare {
231 bool operator()(const std::shared_ptr<Task>& a, const std::shared_ptr<Task>& b) {
232 // Higher priority first, then lower ID (FIFO within priority)
233 if (a->priority != b->priority) {
234 return static_cast<int>(a->priority) < static_cast<int>(b->priority);
235 }
236 return a->id > b->id;
237 }
238 };
239
240 std::priority_queue<std::shared_ptr<Task>,
241 std::vector<std::shared_ptr<Task>>,
242 TaskCompare> task_queue_;
243
244 // Active tasks map (task_id -> task)
245 std::unordered_map<uint32_t, std::shared_ptr<Task>> active_tasks_;
246
247 // Synchronization
248 mutable std::mutex queue_mutex_;
249 std::condition_variable queue_cv_;
250 std::condition_variable completion_cv_;
251
252 // Callback queue for main thread execution
253 std::queue<std::function<void()>> callback_queue_;
254 mutable std::mutex callback_mutex_;
255
256 // Statistics
257 std::atomic<size_t> active_workers_{0};
258 std::atomic<size_t> total_tasks_submitted_{0};
259 std::atomic<size_t> total_tasks_completed_{0};
260#else
261 // Stub members for non-Emscripten builds
262 uint32_t next_task_id_{1};
263 std::vector<WorkerStats> worker_stats_;
267#endif
268};
269
270} // namespace wasm
271} // namespace platform
272} // namespace app
273} // namespace yaze
274
275#endif // YAZE_APP_PLATFORM_WASM_WORKER_POOL_H
Web Worker pool for offloading CPU-intensive operations.
std::vector< uint8_t > ExecuteTask(const Task &task)
std::vector< uint8_t > ProcessPaletteCalculation(const std::vector< uint8_t > &input)
size_t GetActiveWorkerCount() const
Get the number of active workers.
bool IsInitialized() const
Check if the worker pool is initialized.
std::vector< uint8_t > ProcessGraphicsDecoding(const std::vector< uint8_t > &input)
static constexpr uint32_t kSynchronousTaskId
bool WaitAll(uint32_t timeout_ms=0)
Wait for all pending tasks to complete.
void ProcessCallbacks()
Process any pending callbacks on the main thread. Should be called periodically from the main loop.
std::function< void(bool success, const std::vector< uint8_t > &result)> TaskCallback
uint32_t SubmitTaskWithProgress(TaskType type, const std::vector< uint8_t > &input_data, TaskCallback completion_callback, ProgressCallback progress_callback, Priority priority=Priority::kNormal)
Submit a task with progress reporting.
std::vector< uint8_t > ProcessAsarCompilation(const std::vector< uint8_t > &input)
std::vector< uint8_t > ProcessRomDecompression(const std::vector< uint8_t > &input)
std::function< void(float progress, const std::string &message)> ProgressCallback
uint32_t SubmitTask(TaskType type, const std::vector< uint8_t > &input_data, TaskCallback callback, Priority priority=Priority::kNormal)
Submit a task to the worker pool.
void QueueCallback(std::function< void()> callback)
bool Cancel(uint32_t task_id)
Cancel a pending task.
void CancelAllOfType(TaskType type)
Cancel all pending tasks of a specific type.
size_t GetPendingCount() const
Get the number of pending tasks.
void ReportProgress(uint32_t task_id, float progress, const std::string &message)
void ProcessTask(const Task &task, size_t worker_id)
void SetMaxWorkers(size_t count)
Set the maximum number of concurrent workers.
uint32_t SubmitCustomTask(const std::string &type_string, const std::vector< uint8_t > &input_data, TaskCallback callback, Priority priority=Priority::kNormal)
Submit a custom task type.
std::vector< WorkerStats > GetWorkerStats() const
Get statistics for all workers.