yaze 0.3.2
Link to the Past ROM Editor
 
Loading...
Searching...
No Matches
wasm_worker_pool.cc
Go to the documentation of this file.
1// clang-format off
2#ifdef __EMSCRIPTEN__
3
5
6#include <algorithm>
7#include <chrono>
8#include <cstring>
9#include <iostream>
10
11#include <emscripten/bind.h>
12#include <emscripten/val.h>
13
14#include "absl/strings/str_format.h"
15
16namespace yaze {
17namespace app {
18namespace platform {
19namespace wasm {
20
21namespace {
22
23// Get optimal worker count based on hardware
24size_t GetOptimalWorkerCount() {
25#ifdef __EMSCRIPTEN__
26 // In Emscripten, check navigator.hardwareConcurrency
27 EM_ASM({
28 if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
29 Module['_yaze_hardware_concurrency'] = navigator.hardwareConcurrency;
30 } else {
31 Module['_yaze_hardware_concurrency'] = 4; // Default fallback
32 }
33 });
34
35 // Read the value set by JavaScript
36 int concurrency = EM_ASM_INT({
37 return Module['_yaze_hardware_concurrency'] || 4;
38 });
39
40 // Use half the available cores for workers, minimum 2, maximum 8
41 return std::max(2, std::min(8, concurrency / 2));
42#else
43 // Native platform
44 unsigned int hw_threads = std::thread::hardware_concurrency();
45 if (hw_threads == 0) hw_threads = 4; // Fallback
46 return std::max(2u, std::min(8u, hw_threads / 2));
47#endif
48}
49
50} // namespace
51
52WasmWorkerPool::WasmWorkerPool(size_t num_workers)
53 : num_workers_(num_workers == 0 ? GetOptimalWorkerCount() : num_workers) {
55}
56
58 if (initialized_) {
59 Shutdown();
60 }
61}
62
64 if (initialized_) {
65 return true;
66 }
67
68#ifdef __EMSCRIPTEN__
69 // Check if SharedArrayBuffer is available (required for pthreads)
70 bool has_shared_array_buffer = EM_ASM_INT({
71 return typeof SharedArrayBuffer !== 'undefined';
72 });
73
74 if (!has_shared_array_buffer) {
75 std::cerr << "WasmWorkerPool: SharedArrayBuffer not available. "
76 << "Workers will run in degraded mode.\n";
77 // Could fall back to single-threaded mode or use postMessage-based workers
78 // For now, we'll proceed but with reduced functionality
79 num_workers_ = 0;
80 initialized_ = true;
81 return true;
82 }
83
84 // Log initialization
85 EM_ASM({
86 console.log('WasmWorkerPool: Initializing with', $0, 'workers');
87 }, num_workers_);
88#endif
89
90 // Create worker threads
91 workers_.reserve(num_workers_);
92 for (size_t i = 0; i < num_workers_; ++i) {
93 workers_.emplace_back(&WasmWorkerPool::WorkerThread, this, i);
94 }
95
96 initialized_ = true;
97 return true;
98}
99
101 if (!initialized_) {
102 return;
103 }
104
105 // Signal shutdown
106 {
107 std::lock_guard<std::mutex> lock(queue_mutex_);
108 shutting_down_ = true;
109 }
110 queue_cv_.notify_all();
111
112 // Wait for all workers to finish
113 for (auto& worker : workers_) {
114 if (worker.joinable()) {
115 worker.join();
116 }
117 }
118
119 workers_.clear();
120
121 // Clear any remaining tasks
122 {
123 std::lock_guard<std::mutex> lock(queue_mutex_);
124 while (!task_queue_.empty()) {
125 task_queue_.pop();
126 }
127 active_tasks_.clear();
128 }
129
130 initialized_ = false;
131}
132
133uint32_t WasmWorkerPool::SubmitTask(TaskType type,
134 const std::vector<uint8_t>& input_data,
135 TaskCallback callback,
136 Priority priority) {
137 return SubmitTaskWithProgress(type, input_data, callback, nullptr, priority);
138}
139
140uint32_t WasmWorkerPool::SubmitCustomTask(const std::string& type_string,
141 const std::vector<uint8_t>& input_data,
142 TaskCallback callback,
143 Priority priority) {
144 auto task = std::make_shared<Task>();
145 task->id = next_task_id_++;
146 task->type = TaskType::kCustom;
147 task->type_string = type_string;
148 task->priority = priority;
149 task->input_data = input_data;
150 task->completion_callback = callback;
151
152 {
153 std::lock_guard<std::mutex> lock(queue_mutex_);
154 active_tasks_[task->id] = task;
155 task_queue_.push(task);
157 }
158
159 queue_cv_.notify_one();
160 return task->id;
161}
162
163uint32_t WasmWorkerPool::SubmitTaskWithProgress(TaskType type,
164 const std::vector<uint8_t>& input_data,
165 TaskCallback completion_callback,
166 ProgressCallback progress_callback,
167 Priority priority) {
168 // If no workers available, execute synchronously
169 if (num_workers_ == 0 || !initialized_) {
170 if (completion_callback) {
171 try {
172 auto task = std::make_shared<Task>();
173 task->type = type;
174 task->input_data = input_data;
175 auto result = ExecuteTask(*task);
176 completion_callback(true, result);
177 } catch (const std::exception& e) {
178 completion_callback(false, std::vector<uint8_t>());
179 }
180 }
181 // Return special ID to indicate synchronous execution (task already completed)
182 return kSynchronousTaskId;
183 }
184
185 auto task = std::make_shared<Task>();
186 task->id = next_task_id_++;
187 task->type = type;
188 task->priority = priority;
189 task->input_data = input_data;
190 task->completion_callback = completion_callback;
191 task->progress_callback = progress_callback;
192
193 {
194 std::lock_guard<std::mutex> lock(queue_mutex_);
195 active_tasks_[task->id] = task;
196 task_queue_.push(task);
198 }
199
200 queue_cv_.notify_one();
201 return task->id;
202}
203
204bool WasmWorkerPool::Cancel(uint32_t task_id) {
205 std::lock_guard<std::mutex> lock(queue_mutex_);
206
207 auto it = active_tasks_.find(task_id);
208 if (it != active_tasks_.end()) {
209 it->second->cancelled = true;
210 return true;
211 }
212
213 return false;
214}
215
216void WasmWorkerPool::CancelAllOfType(TaskType type) {
217 std::lock_guard<std::mutex> lock(queue_mutex_);
218
219 for (auto& [id, task] : active_tasks_) {
220 if (task->type == type) {
221 task->cancelled = true;
222 }
223 }
224}
225
226bool WasmWorkerPool::WaitAll(uint32_t timeout_ms) {
227 auto start = std::chrono::steady_clock::now();
228
229 while (true) {
230 {
231 std::lock_guard<std::mutex> lock(queue_mutex_);
232 if (task_queue_.empty() && active_workers_ == 0) {
233 return true;
234 }
235 }
236
237 if (timeout_ms > 0) {
238 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
239 std::chrono::steady_clock::now() - start).count();
240 if (elapsed >= timeout_ms) {
241 return false;
242 }
243 }
244
245 std::unique_lock<std::mutex> lock(queue_mutex_);
246 completion_cv_.wait_for(lock, std::chrono::milliseconds(100));
247 }
248}
249
250size_t WasmWorkerPool::GetPendingCount() const {
251 std::lock_guard<std::mutex> lock(queue_mutex_);
252 return task_queue_.size();
253}
254
256 return active_workers_.load();
257}
258
259std::vector<WasmWorkerPool::WorkerStats> WasmWorkerPool::GetWorkerStats() const {
260 std::lock_guard<std::mutex> lock(queue_mutex_);
261 return worker_stats_;
262}
263
264void WasmWorkerPool::SetMaxWorkers(size_t count) {
265 // This would require stopping and restarting workers
266 // For simplicity, we'll just store the value for next initialization
267 if (!initialized_) {
268 num_workers_ = count;
269 }
270}
271
273 std::queue<std::function<void()>> callbacks_to_process;
274
275 {
276 std::lock_guard<std::mutex> lock(callback_mutex_);
277 callbacks_to_process.swap(callback_queue_);
278 }
279
280 while (!callbacks_to_process.empty()) {
281 callbacks_to_process.front()();
282 callbacks_to_process.pop();
283 }
284}
285
286void WasmWorkerPool::WorkerThread(size_t worker_id) {
287#ifdef __EMSCRIPTEN__
288 // Set thread name for debugging
289 emscripten_set_thread_name(pthread_self(),
290 absl::StrFormat("YazeWorker%zu", worker_id).c_str());
291#endif
292
293 while (true) {
294 std::shared_ptr<Task> task;
295
296 // Get next task from queue
297 {
298 std::unique_lock<std::mutex> lock(queue_mutex_);
299 queue_cv_.wait(lock, [this] {
300 return shutting_down_ || !task_queue_.empty();
301 });
302
303 if (shutting_down_ && task_queue_.empty()) {
304 break;
305 }
306
307 if (!task_queue_.empty()) {
308 task = task_queue_.top();
309 task_queue_.pop();
311 worker_stats_[worker_id].is_busy = true;
312 worker_stats_[worker_id].current_task_type =
313 task->type == TaskType::kCustom ? task->type_string :
314 absl::StrFormat("Type%d", static_cast<int>(task->type));
315 }
316 }
317
318 if (task && !task->cancelled) {
319 ProcessTask(*task, worker_id);
320 }
321
322 // Clean up
323 if (task) {
324 {
325 std::lock_guard<std::mutex> lock(queue_mutex_);
326 active_tasks_.erase(task->id);
328 worker_stats_[worker_id].is_busy = false;
329 worker_stats_[worker_id].current_task_type.clear();
330 }
331 completion_cv_.notify_all();
332 }
333 }
334}
335
336void WasmWorkerPool::ProcessTask(const Task& task, size_t worker_id) {
337 auto start_time = std::chrono::steady_clock::now();
338 bool success = false;
339 std::vector<uint8_t> result;
340
341 try {
342 // Report starting
343 if (task.progress_callback) {
344 ReportProgress(task.id, 0.0f, "Starting task...");
345 }
346
347 // Execute the task
348 result = ExecuteTask(task);
349 success = true;
350
351 // Update stats
352 worker_stats_[worker_id].tasks_completed++;
353
354 // Report completion
355 if (task.progress_callback) {
356 ReportProgress(task.id, 1.0f, "Task completed");
357 }
358 } catch (const std::exception& e) {
359 std::cerr << "Worker " << worker_id << " task failed: " << e.what() << std::endl;
360 worker_stats_[worker_id].tasks_failed++;
361 success = false;
362 }
363
364 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
365 std::chrono::steady_clock::now() - start_time).count();
366 worker_stats_[worker_id].total_processing_time_ms += elapsed;
367
368 // Queue callback for main thread execution
369 if (task.completion_callback && !task.cancelled) {
370 QueueCallback([callback = task.completion_callback, success, result]() {
371 callback(success, result);
372 });
373 }
374
376}
377
378std::vector<uint8_t> WasmWorkerPool::ExecuteTask(const Task& task) {
379 switch (task.type) {
381 return ProcessRomDecompression(task.input_data);
382
384 return ProcessGraphicsDecoding(task.input_data);
385
387 return ProcessPaletteCalculation(task.input_data);
388
390 return ProcessAsarCompilation(task.input_data);
391
393 // For custom tasks, just return the input as we don't know how to process it
394 // Real implementation would need a registry of custom processors
395 return task.input_data;
396
397 default:
398 throw std::runtime_error("Unknown task type");
399 }
400}
401
402std::vector<uint8_t> WasmWorkerPool::ProcessRomDecompression(const std::vector<uint8_t>& input) {
403 // Placeholder for LC-LZ2 decompression
404 // In real implementation, this would call the actual decompression routine
405 // from src/app/gfx/compression.cc
406
407 // For now, simulate some work
408 std::vector<uint8_t> result;
409 result.reserve(input.size() * 2); // Assume 2x expansion
410
411 // Simulate decompression (just duplicate data for testing)
412 for (size_t i = 0; i < input.size(); ++i) {
413 result.push_back(input[i]);
414 result.push_back(input[i] ^ 0xFF); // Inverted copy
415
416 // Simulate progress reporting
417 if (i % 1000 == 0) {
418 float progress = static_cast<float>(i) / input.size();
419 // Would call ReportProgress here if we had the task ID
420 }
421 }
422
423 return result;
424}
425
426std::vector<uint8_t> WasmWorkerPool::ProcessGraphicsDecoding(const std::vector<uint8_t>& input) {
427 // Placeholder for graphics sheet decoding
428 // In real implementation, this would decode SNES tile formats
429
430 std::vector<uint8_t> result;
431 result.reserve(input.size());
432
433 // Simulate processing
434 for (uint8_t byte : input) {
435 // Simple transformation to simulate work
436 result.push_back((byte << 1) | (byte >> 7));
437 }
438
439 return result;
440}
441
442std::vector<uint8_t> WasmWorkerPool::ProcessPaletteCalculation(const std::vector<uint8_t>& input) {
443 // Placeholder for palette calculations
444 // In real implementation, this would process SNES color formats
445
446 std::vector<uint8_t> result;
447
448 // Process in groups of 2 bytes (SNES color format)
449 for (size_t i = 0; i + 1 < input.size(); i += 2) {
450 uint16_t snes_color = (input[i + 1] << 8) | input[i];
451
452 // Extract RGB components (5 bits each)
453 uint8_t r = (snes_color & 0x1F) << 3;
454 uint8_t g = ((snes_color >> 5) & 0x1F) << 3;
455 uint8_t b = ((snes_color >> 10) & 0x1F) << 3;
456
457 // Store as RGB24
458 result.push_back(r);
459 result.push_back(g);
460 result.push_back(b);
461 }
462
463 return result;
464}
465
466std::vector<uint8_t> WasmWorkerPool::ProcessAsarCompilation(const std::vector<uint8_t>& input) {
467 // Placeholder for Asar assembly compilation
468 // In real implementation, this would call the Asar wrapper
469
470 // For now, return empty result (compilation succeeded with no output)
471 return std::vector<uint8_t>();
472}
473
474void WasmWorkerPool::ReportProgress(uint32_t task_id, float progress, const std::string& message) {
475 // Find the task
476 std::shared_ptr<Task> task;
477 {
478 std::lock_guard<std::mutex> lock(queue_mutex_);
479 auto it = active_tasks_.find(task_id);
480 if (it != active_tasks_.end()) {
481 task = it->second;
482 }
483 }
484
485 if (task && task->progress_callback && !task->cancelled) {
486 QueueCallback([callback = task->progress_callback, progress, message]() {
487 callback(progress, message);
488 });
489 }
490}
491
492void WasmWorkerPool::QueueCallback(std::function<void()> callback) {
493#ifdef __EMSCRIPTEN__
494 // In Emscripten, we need to execute callbacks on the main thread
495 // Use emscripten_async_run_in_main_runtime_thread for thread safety
496
497 auto* callback_ptr = new std::function<void()>(std::move(callback));
498
499 emscripten_async_run_in_main_runtime_thread(
500 EM_FUNC_SIG_VI,
501 &WasmWorkerPool::MainThreadCallbackHandler,
502 callback_ptr);
503#else
504 // For non-Emscripten builds, just queue for later processing
505 std::lock_guard<std::mutex> lock(callback_mutex_);
506 callback_queue_.push(callback);
507#endif
508}
509
510#ifdef __EMSCRIPTEN__
511void WasmWorkerPool::MainThreadCallbackHandler(void* arg) {
512 auto* callback_ptr = static_cast<std::function<void()>*>(arg);
513 if (callback_ptr) {
514 (*callback_ptr)();
515 delete callback_ptr;
516 }
517}
518#endif
519
520} // namespace wasm
521} // namespace platform
522} // namespace app
523} // namespace yaze
524
525#else // !__EMSCRIPTEN__
526
527// Stub implementation for non-Emscripten builds
529
530namespace yaze {
531namespace app {
532namespace platform {
533namespace wasm {
534
535WasmWorkerPool::WasmWorkerPool(size_t num_workers) : num_workers_(0) {}
537
538bool WasmWorkerPool::Initialize() { return false; }
540
542 const std::vector<uint8_t>& input_data,
543 TaskCallback callback,
544 Priority priority) {
545 // No-op in non-WASM builds
546 if (callback) {
547 callback(false, std::vector<uint8_t>());
548 }
549 return 0;
550}
551
552uint32_t WasmWorkerPool::SubmitCustomTask(const std::string& type_string,
553 const std::vector<uint8_t>& input_data,
554 TaskCallback callback,
555 Priority priority) {
556 if (callback) {
557 callback(false, std::vector<uint8_t>());
558 }
559 return 0;
560}
561
563 const std::vector<uint8_t>& input_data,
564 TaskCallback completion_callback,
565 ProgressCallback progress_callback,
566 Priority priority) {
567 if (completion_callback) {
568 completion_callback(false, std::vector<uint8_t>());
569 }
570 return 0;
571}
572
573bool WasmWorkerPool::Cancel(uint32_t task_id) { return false; }
575bool WasmWorkerPool::WaitAll(uint32_t timeout_ms) { return true; }
576size_t WasmWorkerPool::GetPendingCount() const { return 0; }
577size_t WasmWorkerPool::GetActiveWorkerCount() const { return 0; }
578std::vector<WasmWorkerPool::WorkerStats> WasmWorkerPool::GetWorkerStats() const { return {}; }
579void WasmWorkerPool::SetMaxWorkers(size_t count) {}
581
582void WasmWorkerPool::WorkerThread(size_t worker_id) {}
583void WasmWorkerPool::ProcessTask(const Task& task, size_t worker_id) {}
584std::vector<uint8_t> WasmWorkerPool::ExecuteTask(const Task& task) { return {}; }
585std::vector<uint8_t> WasmWorkerPool::ProcessRomDecompression(const std::vector<uint8_t>& input) { return {}; }
586std::vector<uint8_t> WasmWorkerPool::ProcessGraphicsDecoding(const std::vector<uint8_t>& input) { return {}; }
587std::vector<uint8_t> WasmWorkerPool::ProcessPaletteCalculation(const std::vector<uint8_t>& input) { return {}; }
588std::vector<uint8_t> WasmWorkerPool::ProcessAsarCompilation(const std::vector<uint8_t>& input) { return {}; }
589void WasmWorkerPool::ReportProgress(uint32_t task_id, float progress, const std::string& message) {}
590void WasmWorkerPool::QueueCallback(std::function<void()> callback) {
591 if (callback) callback();
592}
593
594} // namespace wasm
595} // namespace platform
596} // namespace app
597} // namespace yaze
598
599#endif // __EMSCRIPTEN__
std::vector< uint8_t > ExecuteTask(const Task &task)
std::vector< uint8_t > ProcessPaletteCalculation(const std::vector< uint8_t > &input)
size_t GetActiveWorkerCount() const
Get the number of active workers.
std::vector< uint8_t > ProcessGraphicsDecoding(const std::vector< uint8_t > &input)
static constexpr uint32_t kSynchronousTaskId
bool WaitAll(uint32_t timeout_ms=0)
Wait for all pending tasks to complete.
void ProcessCallbacks()
Process any pending callbacks on the main thread. Should be called periodically from the main loop.
std::function< void(bool success, const std::vector< uint8_t > &result)> TaskCallback
uint32_t SubmitTaskWithProgress(TaskType type, const std::vector< uint8_t > &input_data, TaskCallback completion_callback, ProgressCallback progress_callback, Priority priority=Priority::kNormal)
Submit a task with progress reporting.
std::vector< uint8_t > ProcessAsarCompilation(const std::vector< uint8_t > &input)
std::vector< uint8_t > ProcessRomDecompression(const std::vector< uint8_t > &input)
std::function< void(float progress, const std::string &message)> ProgressCallback
uint32_t SubmitTask(TaskType type, const std::vector< uint8_t > &input_data, TaskCallback callback, Priority priority=Priority::kNormal)
Submit a task to the worker pool.
void QueueCallback(std::function< void()> callback)
bool Cancel(uint32_t task_id)
Cancel a pending task.
void CancelAllOfType(TaskType type)
Cancel all pending tasks of a specific type.
size_t GetPendingCount() const
Get the number of pending tasks.
void ReportProgress(uint32_t task_id, float progress, const std::string &message)
void ProcessTask(const Task &task, size_t worker_id)
void SetMaxWorkers(size_t count)
Set the maximum number of concurrent workers.
uint32_t SubmitCustomTask(const std::string &type_string, const std::vector< uint8_t > &input_data, TaskCallback callback, Priority priority=Priority::kNormal)
Submit a custom task type.
std::vector< WorkerStats > GetWorkerStats() const
Get statistics for all workers.
SNES color in 15-bit RGB format (BGR555)
Definition yaze.h:218