11#include <emscripten/bind.h>
12#include <emscripten/val.h>
14#include "absl/strings/str_format.h"
24size_t GetOptimalWorkerCount() {
28 if (typeof navigator !==
'undefined' && navigator.hardwareConcurrency) {
29 Module[
'_yaze_hardware_concurrency'] = navigator.hardwareConcurrency;
31 Module[
'_yaze_hardware_concurrency'] = 4;
36 int concurrency = EM_ASM_INT({
37 return Module[
'_yaze_hardware_concurrency'] || 4;
41 return std::max(2, std::min(8, concurrency / 2));
44 unsigned int hw_threads = std::thread::hardware_concurrency();
45 if (hw_threads == 0) hw_threads = 4;
46 return std::max(2u, std::min(8u, hw_threads / 2));
53 : num_workers_(num_workers == 0 ? GetOptimalWorkerCount() : num_workers) {
70 bool has_shared_array_buffer = EM_ASM_INT({
71 return typeof SharedArrayBuffer !==
'undefined';
74 if (!has_shared_array_buffer) {
75 std::cerr <<
"WasmWorkerPool: SharedArrayBuffer not available. "
76 <<
"Workers will run in degraded mode.\n";
86 console.log(
'WasmWorkerPool: Initializing with', $0,
'workers');
107 std::lock_guard<std::mutex> lock(queue_mutex_);
110 queue_cv_.notify_all();
113 for (
auto& worker : workers_) {
114 if (worker.joinable()) {
123 std::lock_guard<std::mutex> lock(queue_mutex_);
124 while (!task_queue_.empty()) {
127 active_tasks_.clear();
134 const std::vector<uint8_t>& input_data,
135 TaskCallback callback,
141 const std::vector<uint8_t>& input_data,
142 TaskCallback callback,
144 auto task = std::make_shared<Task>();
147 task->type_string = type_string;
148 task->priority = priority;
149 task->input_data = input_data;
150 task->completion_callback = callback;
153 std::lock_guard<std::mutex> lock(queue_mutex_);
154 active_tasks_[task->id] = task;
155 task_queue_.push(task);
159 queue_cv_.notify_one();
164 const std::vector<uint8_t>& input_data,
165 TaskCallback completion_callback,
166 ProgressCallback progress_callback,
170 if (completion_callback) {
172 auto task = std::make_shared<Task>();
174 task->input_data = input_data;
176 completion_callback(
true, result);
177 }
catch (
const std::exception& e) {
178 completion_callback(
false, std::vector<uint8_t>());
185 auto task = std::make_shared<Task>();
188 task->priority = priority;
189 task->input_data = input_data;
190 task->completion_callback = completion_callback;
191 task->progress_callback = progress_callback;
194 std::lock_guard<std::mutex> lock(queue_mutex_);
195 active_tasks_[task->id] = task;
196 task_queue_.push(task);
200 queue_cv_.notify_one();
205 std::lock_guard<std::mutex> lock(queue_mutex_);
207 auto it = active_tasks_.find(task_id);
208 if (it != active_tasks_.end()) {
209 it->second->cancelled =
true;
217 std::lock_guard<std::mutex> lock(queue_mutex_);
219 for (
auto& [
id, task] : active_tasks_) {
220 if (task->type == type) {
221 task->cancelled =
true;
227 auto start = std::chrono::steady_clock::now();
231 std::lock_guard<std::mutex> lock(queue_mutex_);
237 if (timeout_ms > 0) {
238 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
239 std::chrono::steady_clock::now() - start).count();
240 if (elapsed >= timeout_ms) {
245 std::unique_lock<std::mutex> lock(queue_mutex_);
246 completion_cv_.wait_for(lock, std::chrono::milliseconds(100));
251 std::lock_guard<std::mutex> lock(queue_mutex_);
252 return task_queue_.size();
260 std::lock_guard<std::mutex> lock(queue_mutex_);
273 std::queue<std::function<void()>> callbacks_to_process;
276 std::lock_guard<std::mutex> lock(callback_mutex_);
277 callbacks_to_process.swap(callback_queue_);
280 while (!callbacks_to_process.empty()) {
281 callbacks_to_process.front()();
282 callbacks_to_process.pop();
289 emscripten_set_thread_name(pthread_self(),
290 absl::StrFormat(
"YazeWorker%zu", worker_id).c_str());
294 std::shared_ptr<Task> task;
298 std::unique_lock<std::mutex> lock(queue_mutex_);
299 queue_cv_.wait(lock, [
this] {
307 if (!task_queue_.empty()) {
308 task = task_queue_.top();
314 absl::StrFormat(
"Type%d",
static_cast<int>(task->type));
318 if (task && !task->cancelled) {
325 std::lock_guard<std::mutex> lock(queue_mutex_);
326 active_tasks_.erase(task->id);
331 completion_cv_.notify_all();
337 auto start_time = std::chrono::steady_clock::now();
338 bool success =
false;
339 std::vector<uint8_t> result;
343 if (task.progress_callback) {
355 if (task.progress_callback) {
358 }
catch (
const std::exception& e) {
359 std::cerr <<
"Worker " << worker_id <<
" task failed: " << e.what() << std::endl;
364 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
365 std::chrono::steady_clock::now() - start_time).count();
366 worker_stats_[worker_id].total_processing_time_ms += elapsed;
369 if (task.completion_callback && !task.cancelled) {
370 QueueCallback([callback = task.completion_callback, success, result]() {
371 callback(success, result);
395 return task.input_data;
398 throw std::runtime_error(
"Unknown task type");
408 std::vector<uint8_t> result;
409 result.reserve(input.size() * 2);
412 for (
size_t i = 0; i < input.size(); ++i) {
413 result.push_back(input[i]);
414 result.push_back(input[i] ^ 0xFF);
418 float progress =
static_cast<float>(i) / input.size();
430 std::vector<uint8_t> result;
431 result.reserve(input.size());
434 for (uint8_t
byte : input) {
436 result.push_back((
byte << 1) | (
byte >> 7));
446 std::vector<uint8_t> result;
449 for (
size_t i = 0; i + 1 < input.size(); i += 2) {
450 uint16_t
snes_color = (input[i + 1] << 8) | input[i];
471 return std::vector<uint8_t>();
476 std::shared_ptr<Task> task;
478 std::lock_guard<std::mutex> lock(queue_mutex_);
479 auto it = active_tasks_.find(task_id);
480 if (it != active_tasks_.end()) {
485 if (task && task->progress_callback && !task->cancelled) {
486 QueueCallback([callback = task->progress_callback, progress, message]() {
487 callback(progress, message);
497 auto* callback_ptr =
new std::function<void()>(std::move(callback));
499 emscripten_async_run_in_main_runtime_thread(
501 &WasmWorkerPool::MainThreadCallbackHandler,
505 std::lock_guard<std::mutex> lock(callback_mutex_);
506 callback_queue_.push(callback);
511void WasmWorkerPool::MainThreadCallbackHandler(
void* arg) {
512 auto* callback_ptr =
static_cast<std::function<
void()
>*>(arg);
542 const std::vector<uint8_t>& input_data,
547 callback(
false, std::vector<uint8_t>());
553 const std::vector<uint8_t>& input_data,
557 callback(
false, std::vector<uint8_t>());
563 const std::vector<uint8_t>& input_data,
567 if (completion_callback) {
568 completion_callback(
false, std::vector<uint8_t>());
591 if (callback) callback();
SNES color in 15-bit RGB format (BGR555)