10#include "absl/strings/str_format.h"
18EM_JS(
int, mq_save_queue, (
const char* key,
const char* json_data), {
19 return Asyncify.handleAsync(function() {
20 var keyStr = UTF8ToString(key);
21 var jsonStr = UTF8ToString(json_data);
22 var operation = function() {
23 return new Promise(function(resolve) {
26 var request = indexedDB.open(
'YazeMessageQueue', 1);
28 request.onerror = function() {
29 console.error(
'Failed to open message queue database:', request.error);
33 request.onupgradeneeded = function(event) {
34 var db =
event.target.result;
35 if (!db.objectStoreNames.contains(
'queues')) {
36 db.createObjectStore(
'queues');
40 request.onsuccess = function() {
41 var db = request.result;
42 var transaction = db.transaction([
'queues'],
'readwrite');
43 var store = transaction.objectStore(
'queues');
44 var putRequest = store.put(jsonStr, keyStr);
46 putRequest.onsuccess = function() {
51 putRequest.onerror = function() {
52 console.error(
'Failed to save message queue:', putRequest.error);
58 console.error(
'Exception in mq_save_queue:', e);
63 if (window.yazeAsyncQueue) {
64 return window.yazeAsyncQueue.enqueue(operation);
70EM_JS(
char*, mq_load_queue, (
const char* key), {
71 return Asyncify.handleAsync(function() {
72 var keyStr = UTF8ToString(key);
73 var operation = function() {
74 return new Promise(function(resolve) {
76 var request = indexedDB.open(
'YazeMessageQueue', 1);
78 request.onerror = function() {
79 console.error(
'Failed to open message queue database:', request.error);
83 request.onupgradeneeded = function(event) {
84 var db =
event.target.result;
85 if (!db.objectStoreNames.contains(
'queues')) {
86 db.createObjectStore(
'queues');
90 request.onsuccess = function() {
91 var db = request.result;
92 var transaction = db.transaction([
'queues'],
'readonly');
93 var store = transaction.objectStore(
'queues');
94 var getRequest = store.get(keyStr);
96 getRequest.onsuccess = function() {
97 var result = getRequest.result;
100 if (result && typeof result ===
'string') {
101 var len = lengthBytesUTF8(result) + 1;
102 var ptr = Module._malloc(len);
103 stringToUTF8(result, ptr, len);
110 getRequest.onerror = function() {
111 console.error(
'Failed to load message queue:', getRequest.error);
117 console.error(
'Exception in mq_load_queue:', e);
122 if (window.yazeAsyncQueue) {
123 return window.yazeAsyncQueue.enqueue(operation);
129EM_JS(
int, mq_clear_queue, (
const char* key), {
130 return Asyncify.handleAsync(function() {
131 var keyStr = UTF8ToString(key);
132 var operation = function() {
133 return new Promise(function(resolve) {
135 var request = indexedDB.open(
'YazeMessageQueue', 1);
137 request.onerror = function() {
138 console.error(
'Failed to open message queue database:', request.error);
142 request.onsuccess = function() {
143 var db = request.result;
144 var transaction = db.transaction([
'queues'],
'readwrite');
145 var store = transaction.objectStore(
'queues');
146 var deleteRequest = store.delete(keyStr);
148 deleteRequest.onsuccess = function() {
153 deleteRequest.onerror = function() {
154 console.error(
'Failed to clear message queue:', deleteRequest.error);
160 console.error(
'Exception in mq_clear_queue:', e);
165 if (window.yazeAsyncQueue) {
166 return window.yazeAsyncQueue.enqueue(operation);
173static double GetCurrentTime() {
174 return emscripten_get_now() / 1000.0;
181 emscripten_log(EM_LOG_WARN,
"Failed to load message queue from storage: %s",
182 status.ToString().c_str());
188 if (auto_persist_ && !queue_.empty()) {
191 emscripten_log(EM_LOG_ERROR,
"Failed to persist message queue on destruction: %s",
192 status.ToString().c_str());
198 const std::string& payload) {
199 std::lock_guard<std::mutex> lock(queue_mutex_);
202 if (queue_.size() >= max_queue_size_) {
209 msg.message_type = message_type;
210 msg.payload = payload;
211 msg.timestamp = GetCurrentTime();
213 msg.id = GenerateMessageId();
216 queue_.push_back(msg);
220 NotifyStatusChange();
230 emscripten_log(EM_LOG_WARN,
"Already replaying messages, skipping replay request");
234 is_replaying_ =
true;
239 std::vector<QueuedMessage> messages_to_send;
241 std::lock_guard<std::mutex> lock(queue_mutex_);
242 messages_to_send.reserve(queue_.size());
243 for (
const auto& msg : queue_) {
244 messages_to_send.push_back(msg);
249 std::vector<std::string> successful_ids;
250 std::vector<QueuedMessage> failed_messages;
252 for (
auto& msg : messages_to_send) {
254 double age = GetCurrentTime() - msg.timestamp;
255 if (age > message_expiry_seconds_) {
260 auto status = sender(msg.message_type, msg.payload);
263 successful_ids.push_back(msg.id);
269 if (msg.retry_count >= max_retries) {
271 failed_messages.push_back(msg);
279 emscripten_log(EM_LOG_WARN,
"Failed to replay message %s (attempt %d): %s",
280 msg.id.c_str(), msg.retry_count, status.ToString().c_str());
286 std::lock_guard<std::mutex> lock(queue_mutex_);
289 for (
const auto&
id : successful_ids) {
291 std::remove_if(queue_.begin(), queue_.end(),
292 [&
id](
const QueuedMessage& m) { return m.id == id; }),
297 for (
const auto& msg : failed_messages) {
298 failed_messages_.push_back(msg);
300 std::remove_if(queue_.begin(), queue_.end(),
301 [&msg](
const QueuedMessage& m) { return m.id == msg.id; }),
306 is_replaying_ =
false;
309 if (replay_complete_callback_) {
310 replay_complete_callback_(replayed, failed);
314 NotifyStatusChange();
321 std::lock_guard<std::mutex> lock(queue_mutex_);
322 return queue_.size();
326 std::lock_guard<std::mutex> lock(queue_mutex_);
329 status.pending_count = queue_.size();
330 status.failed_count = failed_messages_.size();
331 status.total_bytes = CalculateTotalBytes();
333 if (!queue_.empty()) {
334 double now = GetCurrentTime();
335 status.oldest_message_age = now - queue_.front().timestamp;
339 status.is_persisted = auto_persist_;
345 std::lock_guard<std::mutex> lock(queue_mutex_);
347 failed_messages_.clear();
350 mq_clear_queue(kStorageKey);
352 NotifyStatusChange();
356 std::lock_guard<std::mutex> lock(queue_mutex_);
357 failed_messages_.clear();
359 NotifyStatusChange();
364 std::lock_guard<std::mutex> lock(queue_mutex_);
367 auto it = std::find_if(queue_.begin(), queue_.end(),
368 [&message_id](
const QueuedMessage& m) {
369 return m.id == message_id;
372 if (it != queue_.end()) {
374 NotifyStatusChange();
380 auto failed_it = std::find_if(failed_messages_.begin(), failed_messages_.end(),
381 [&message_id](
const QueuedMessage& m) {
382 return m.id == message_id;
385 if (failed_it != failed_messages_.end()) {
386 failed_messages_.erase(failed_it);
387 NotifyStatusChange();
396 std::lock_guard<std::mutex> lock(queue_mutex_);
400 nlohmann::json json_data;
401 json_data[
"version"] = 1;
402 json_data[
"timestamp"] = GetCurrentTime();
405 nlohmann::json queue_array = nlohmann::json::array();
406 for (
const auto& msg : queue_) {
407 nlohmann::json msg_json;
408 msg_json[
"id"] = msg.id;
409 msg_json[
"type"] = msg.message_type;
410 msg_json[
"payload"] = msg.payload;
411 msg_json[
"timestamp"] = msg.timestamp;
412 msg_json[
"retry_count"] = msg.retry_count;
413 queue_array.push_back(msg_json);
415 json_data[
"queue"] = queue_array;
418 nlohmann::json failed_array = nlohmann::json::array();
419 for (
const auto& msg : failed_messages_) {
420 nlohmann::json msg_json;
421 msg_json[
"id"] = msg.id;
422 msg_json[
"type"] = msg.message_type;
423 msg_json[
"payload"] = msg.payload;
424 msg_json[
"timestamp"] = msg.timestamp;
425 msg_json[
"retry_count"] = msg.retry_count;
426 failed_array.push_back(msg_json);
428 json_data[
"failed"] = failed_array;
431 json_data[
"stats"][
"total_enqueued"] = total_enqueued_;
432 json_data[
"stats"][
"total_replayed"] = total_replayed_;
433 json_data[
"stats"][
"total_failed"] = total_failed_;
436 std::string json_str = json_data.dump();
437 int result = mq_save_queue(kStorageKey, json_str.c_str());
440 return absl::InternalError(
"Failed to save message queue to IndexedDB");
443 return absl::OkStatus();
445 }
catch (
const std::exception& e) {
446 return absl::InternalError(absl::StrFormat(
"Failed to serialize message queue: %s", e.what()));
451 char* json_ptr = mq_load_queue(kStorageKey);
454 return absl::OkStatus();
458 std::string json_str(json_ptr);
461 nlohmann::json json_data = nlohmann::json::parse(json_str);
464 int version = json_data.value(
"version", 0);
466 return absl::InvalidArgumentError(absl::StrFormat(
"Unsupported queue version: %d", version));
469 std::lock_guard<std::mutex> lock(queue_mutex_);
473 failed_messages_.clear();
476 if (json_data.contains(
"queue")) {
477 for (
const auto& msg_json : json_data[
"queue"]) {
479 msg.id = msg_json.value(
"id",
"");
480 msg.message_type = msg_json.value(
"type",
"");
481 msg.payload = msg_json.value(
"payload",
"");
482 msg.timestamp = msg_json.value(
"timestamp", 0.0);
483 msg.retry_count = msg_json.value(
"retry_count", 0);
486 double age = GetCurrentTime() - msg.timestamp;
487 if (age <= message_expiry_seconds_) {
488 queue_.push_back(msg);
494 if (json_data.contains(
"failed")) {
495 for (
const auto& msg_json : json_data[
"failed"]) {
497 msg.id = msg_json.value(
"id",
"");
498 msg.message_type = msg_json.value(
"type",
"");
499 msg.payload = msg_json.value(
"payload",
"");
500 msg.timestamp = msg_json.value(
"timestamp", 0.0);
501 msg.retry_count = msg_json.value(
"retry_count", 0);
504 failed_messages_.push_back(msg);
509 if (json_data.contains(
"stats")) {
510 total_enqueued_ = json_data[
"stats"].value(
"total_enqueued", 0);
511 total_replayed_ = json_data[
"stats"].value(
"total_replayed", 0);
512 total_failed_ = json_data[
"stats"].value(
"total_failed", 0);
515 emscripten_log(EM_LOG_INFO,
"Loaded %zu messages from storage (%zu failed)",
516 queue_.size(), failed_messages_.size());
518 NotifyStatusChange();
519 return absl::OkStatus();
521 }
catch (
const std::exception& e) {
523 return absl::InvalidArgumentError(absl::StrFormat(
"Failed to parse saved queue: %s", e.what()));
528 std::lock_guard<std::mutex> lock(queue_mutex_);
529 return std::vector<QueuedMessage>(queue_.begin(), queue_.end());
533 std::lock_guard<std::mutex> lock(queue_mutex_);
535 double now = GetCurrentTime();
536 size_t initial_size = queue_.size();
540 std::remove_if(queue_.begin(), queue_.end(),
541 [now,
this](
const QueuedMessage& msg) {
542 return (now - msg.timestamp) > message_expiry_seconds_;
546 int removed = initial_size - queue_.size();
549 NotifyStatusChange();
556std::string WasmMessageQueue::GenerateMessageId() {
557 static std::random_device rd;
558 static std::mt19937 gen(rd());
559 static std::uniform_int_distribution<> dis(0, 15);
560 static const char* hex_chars =
"0123456789abcdef";
562 std::stringstream ss;
566 ss << static_cast<long long>(GetCurrentTime() * 1000) <<
"_";
569 for (
int i = 0; i < 8; i++) {
570 ss << hex_chars[dis(gen)];
576size_t WasmMessageQueue::CalculateTotalBytes()
const {
579 for (
const auto& msg : queue_) {
580 total += msg.message_type.size();
581 total += msg.payload.size();
582 total += msg.id.size();
583 total +=
sizeof(msg.timestamp) +
sizeof(msg.retry_count);
586 for (
const auto& msg : failed_messages_) {
587 total += msg.message_type.size();
588 total += msg.payload.size();
589 total += msg.id.size();
590 total +=
sizeof(msg.timestamp) +
sizeof(msg.retry_count);
596void WasmMessageQueue::NotifyStatusChange() {
597 if (status_change_callback_) {
602void WasmMessageQueue::MaybePersist() {
603 if (auto_persist_ && !is_replaying_) {
606 emscripten_log(EM_LOG_WARN,
"Failed to auto-persist message queue: %s",
607 status.ToString().c_str());
EM_JS(void, CallJsAiDriver,(const char *history_json), { if(window.yaze &&window.yaze.ai &&window.yaze.ai.processAgentRequest) { window.yaze.ai.processAgentRequest(UTF8ToString(history_json));} else { console.error("AI Driver not found in window.yaze.ai.processAgentRequest");} })