yaze 0.3.2
Link to the Past ROM Editor
 
Loading...
Searching...
No Matches
wasm_message_queue.cc
Go to the documentation of this file.
1// clang-format off
2#ifdef __EMSCRIPTEN__
3
5
6#include <emscripten.h>
7#include <random>
8#include <sstream>
9
10#include "absl/strings/str_format.h"
11
12namespace yaze {
13namespace app {
14namespace platform {
15
16// JavaScript IndexedDB interface for message queue persistence
17// All functions use yazeAsyncQueue to serialize async operations
18EM_JS(int, mq_save_queue, (const char* key, const char* json_data), {
19 return Asyncify.handleAsync(function() {
20 var keyStr = UTF8ToString(key);
21 var jsonStr = UTF8ToString(json_data);
22 var operation = function() {
23 return new Promise(function(resolve) {
24 try {
25 // Open or create the database
26 var request = indexedDB.open('YazeMessageQueue', 1);
27
28 request.onerror = function() {
29 console.error('Failed to open message queue database:', request.error);
30 resolve(-1);
31 };
32
33 request.onupgradeneeded = function(event) {
34 var db = event.target.result;
35 if (!db.objectStoreNames.contains('queues')) {
36 db.createObjectStore('queues');
37 }
38 };
39
40 request.onsuccess = function() {
41 var db = request.result;
42 var transaction = db.transaction(['queues'], 'readwrite');
43 var store = transaction.objectStore('queues');
44 var putRequest = store.put(jsonStr, keyStr);
45
46 putRequest.onsuccess = function() {
47 db.close();
48 resolve(0);
49 };
50
51 putRequest.onerror = function() {
52 console.error('Failed to save message queue:', putRequest.error);
53 db.close();
54 resolve(-1);
55 };
56 };
57 } catch (e) {
58 console.error('Exception in mq_save_queue:', e);
59 resolve(-1);
60 }
61 });
62 };
63 if (window.yazeAsyncQueue) {
64 return window.yazeAsyncQueue.enqueue(operation);
65 }
66 return operation();
67 });
68});
69
70EM_JS(char*, mq_load_queue, (const char* key), {
71 return Asyncify.handleAsync(function() {
72 var keyStr = UTF8ToString(key);
73 var operation = function() {
74 return new Promise(function(resolve) {
75 try {
76 var request = indexedDB.open('YazeMessageQueue', 1);
77
78 request.onerror = function() {
79 console.error('Failed to open message queue database:', request.error);
80 resolve(0);
81 };
82
83 request.onupgradeneeded = function(event) {
84 var db = event.target.result;
85 if (!db.objectStoreNames.contains('queues')) {
86 db.createObjectStore('queues');
87 }
88 };
89
90 request.onsuccess = function() {
91 var db = request.result;
92 var transaction = db.transaction(['queues'], 'readonly');
93 var store = transaction.objectStore('queues');
94 var getRequest = store.get(keyStr);
95
96 getRequest.onsuccess = function() {
97 var result = getRequest.result;
98 db.close();
99
100 if (result && typeof result === 'string') {
101 var len = lengthBytesUTF8(result) + 1;
102 var ptr = Module._malloc(len);
103 stringToUTF8(result, ptr, len);
104 resolve(ptr);
105 } else {
106 resolve(0);
107 }
108 };
109
110 getRequest.onerror = function() {
111 console.error('Failed to load message queue:', getRequest.error);
112 db.close();
113 resolve(0);
114 };
115 };
116 } catch (e) {
117 console.error('Exception in mq_load_queue:', e);
118 resolve(0);
119 }
120 });
121 };
122 if (window.yazeAsyncQueue) {
123 return window.yazeAsyncQueue.enqueue(operation);
124 }
125 return operation();
126 });
127});
128
129EM_JS(int, mq_clear_queue, (const char* key), {
130 return Asyncify.handleAsync(function() {
131 var keyStr = UTF8ToString(key);
132 var operation = function() {
133 return new Promise(function(resolve) {
134 try {
135 var request = indexedDB.open('YazeMessageQueue', 1);
136
137 request.onerror = function() {
138 console.error('Failed to open message queue database:', request.error);
139 resolve(-1);
140 };
141
142 request.onsuccess = function() {
143 var db = request.result;
144 var transaction = db.transaction(['queues'], 'readwrite');
145 var store = transaction.objectStore('queues');
146 var deleteRequest = store.delete(keyStr);
147
148 deleteRequest.onsuccess = function() {
149 db.close();
150 resolve(0);
151 };
152
153 deleteRequest.onerror = function() {
154 console.error('Failed to clear message queue:', deleteRequest.error);
155 db.close();
156 resolve(-1);
157 };
158 };
159 } catch (e) {
160 console.error('Exception in mq_clear_queue:', e);
161 resolve(-1);
162 }
163 });
164 };
165 if (window.yazeAsyncQueue) {
166 return window.yazeAsyncQueue.enqueue(operation);
167 }
168 return operation();
169 });
170});
171
172// Get current time in seconds since epoch
173static double GetCurrentTime() {
174 return emscripten_get_now() / 1000.0;
175}
176
178 // Attempt to load queue from storage on construction
179 auto status = LoadFromStorage();
180 if (!status.ok()) {
181 emscripten_log(EM_LOG_WARN, "Failed to load message queue from storage: %s",
182 status.ToString().c_str());
183 }
184}
185
187 // Persist queue on destruction if auto-persist is enabled
188 if (auto_persist_ && !queue_.empty()) {
189 auto status = PersistToStorage();
190 if (!status.ok()) {
191 emscripten_log(EM_LOG_ERROR, "Failed to persist message queue on destruction: %s",
192 status.ToString().c_str());
193 }
194 }
195}
196
197std::string WasmMessageQueue::Enqueue(const std::string& message_type,
198 const std::string& payload) {
199 std::lock_guard<std::mutex> lock(queue_mutex_);
200
201 // Check queue size limit
202 if (queue_.size() >= max_queue_size_) {
203 // Remove oldest message if at capacity
204 queue_.pop_front();
205 }
206
207 // Create new message
208 QueuedMessage msg;
209 msg.message_type = message_type;
210 msg.payload = payload;
211 msg.timestamp = GetCurrentTime();
212 msg.retry_count = 0;
213 msg.id = GenerateMessageId();
214
215 // Add to queue
216 queue_.push_back(msg);
217 total_enqueued_++;
218
219 // Notify listeners
220 NotifyStatusChange();
221
222 // Maybe persist to storage
223 MaybePersist();
224
225 return msg.id;
226}
227
228void WasmMessageQueue::ReplayAll(MessageSender sender, int max_retries) {
229 if (is_replaying_) {
230 emscripten_log(EM_LOG_WARN, "Already replaying messages, skipping replay request");
231 return;
232 }
233
234 is_replaying_ = true;
235 int replayed = 0;
236 int failed = 0;
237
238 // Copy queue to avoid holding lock during send operations
239 std::vector<QueuedMessage> messages_to_send;
240 {
241 std::lock_guard<std::mutex> lock(queue_mutex_);
242 messages_to_send.reserve(queue_.size());
243 for (const auto& msg : queue_) {
244 messages_to_send.push_back(msg);
245 }
246 }
247
248 // Process each message
249 std::vector<std::string> successful_ids;
250 std::vector<QueuedMessage> failed_messages;
251
252 for (auto& msg : messages_to_send) {
253 // Check if message has expired
254 double age = GetCurrentTime() - msg.timestamp;
255 if (age > message_expiry_seconds_) {
256 continue; // Skip expired messages
257 }
258
259 // Try to send the message
260 auto status = sender(msg.message_type, msg.payload);
261
262 if (status.ok()) {
263 successful_ids.push_back(msg.id);
264 replayed++;
265 total_replayed_++;
266 } else {
267 msg.retry_count++;
268
269 if (msg.retry_count >= max_retries) {
270 // Move to failed list
271 failed_messages.push_back(msg);
272 failed++;
273 total_failed_++;
274 } else {
275 // Keep in queue for retry
276 // Message stays in queue
277 }
278
279 emscripten_log(EM_LOG_WARN, "Failed to replay message %s (attempt %d): %s",
280 msg.id.c_str(), msg.retry_count, status.ToString().c_str());
281 }
282 }
283
284 // Update queue with results
285 {
286 std::lock_guard<std::mutex> lock(queue_mutex_);
287
288 // Remove successful messages
289 for (const auto& id : successful_ids) {
290 queue_.erase(
291 std::remove_if(queue_.begin(), queue_.end(),
292 [&id](const QueuedMessage& m) { return m.id == id; }),
293 queue_.end());
294 }
295
296 // Move failed messages to failed list
297 for (const auto& msg : failed_messages) {
298 failed_messages_.push_back(msg);
299 queue_.erase(
300 std::remove_if(queue_.begin(), queue_.end(),
301 [&msg](const QueuedMessage& m) { return m.id == msg.id; }),
302 queue_.end());
303 }
304 }
305
306 is_replaying_ = false;
307
308 // Notify completion
309 if (replay_complete_callback_) {
310 replay_complete_callback_(replayed, failed);
311 }
312
313 // Update status
314 NotifyStatusChange();
315
316 // Persist changes
317 MaybePersist();
318}
319
320size_t WasmMessageQueue::PendingCount() const {
321 std::lock_guard<std::mutex> lock(queue_mutex_);
322 return queue_.size();
323}
324
325WasmMessageQueue::QueueStatus WasmMessageQueue::GetStatus() const {
326 std::lock_guard<std::mutex> lock(queue_mutex_);
327
328 QueueStatus status;
329 status.pending_count = queue_.size();
330 status.failed_count = failed_messages_.size();
331 status.total_bytes = CalculateTotalBytes();
332
333 if (!queue_.empty()) {
334 double now = GetCurrentTime();
335 status.oldest_message_age = now - queue_.front().timestamp;
336 }
337
338 // Check if queue is persisted (simplified check)
339 status.is_persisted = auto_persist_;
340
341 return status;
342}
343
345 std::lock_guard<std::mutex> lock(queue_mutex_);
346 queue_.clear();
347 failed_messages_.clear();
348
349 // Clear from storage as well
350 mq_clear_queue(kStorageKey);
351
352 NotifyStatusChange();
353}
354
356 std::lock_guard<std::mutex> lock(queue_mutex_);
357 failed_messages_.clear();
358
359 NotifyStatusChange();
360 MaybePersist();
361}
362
363bool WasmMessageQueue::RemoveMessage(const std::string& message_id) {
364 std::lock_guard<std::mutex> lock(queue_mutex_);
365
366 // Try to remove from main queue
367 auto it = std::find_if(queue_.begin(), queue_.end(),
368 [&message_id](const QueuedMessage& m) {
369 return m.id == message_id;
370 });
371
372 if (it != queue_.end()) {
373 queue_.erase(it);
374 NotifyStatusChange();
375 MaybePersist();
376 return true;
377 }
378
379 // Try to remove from failed messages
380 auto failed_it = std::find_if(failed_messages_.begin(), failed_messages_.end(),
381 [&message_id](const QueuedMessage& m) {
382 return m.id == message_id;
383 });
384
385 if (failed_it != failed_messages_.end()) {
386 failed_messages_.erase(failed_it);
387 NotifyStatusChange();
388 MaybePersist();
389 return true;
390 }
391
392 return false;
393}
394
396 std::lock_guard<std::mutex> lock(queue_mutex_);
397
398 try {
399 // Create JSON representation
400 nlohmann::json json_data;
401 json_data["version"] = 1;
402 json_data["timestamp"] = GetCurrentTime();
403
404 // Serialize main queue
405 nlohmann::json queue_array = nlohmann::json::array();
406 for (const auto& msg : queue_) {
407 nlohmann::json msg_json;
408 msg_json["id"] = msg.id;
409 msg_json["type"] = msg.message_type;
410 msg_json["payload"] = msg.payload;
411 msg_json["timestamp"] = msg.timestamp;
412 msg_json["retry_count"] = msg.retry_count;
413 queue_array.push_back(msg_json);
414 }
415 json_data["queue"] = queue_array;
416
417 // Serialize failed messages
418 nlohmann::json failed_array = nlohmann::json::array();
419 for (const auto& msg : failed_messages_) {
420 nlohmann::json msg_json;
421 msg_json["id"] = msg.id;
422 msg_json["type"] = msg.message_type;
423 msg_json["payload"] = msg.payload;
424 msg_json["timestamp"] = msg.timestamp;
425 msg_json["retry_count"] = msg.retry_count;
426 failed_array.push_back(msg_json);
427 }
428 json_data["failed"] = failed_array;
429
430 // Save statistics
431 json_data["stats"]["total_enqueued"] = total_enqueued_;
432 json_data["stats"]["total_replayed"] = total_replayed_;
433 json_data["stats"]["total_failed"] = total_failed_;
434
435 // Convert to string and save
436 std::string json_str = json_data.dump();
437 int result = mq_save_queue(kStorageKey, json_str.c_str());
438
439 if (result != 0) {
440 return absl::InternalError("Failed to save message queue to IndexedDB");
441 }
442
443 return absl::OkStatus();
444
445 } catch (const std::exception& e) {
446 return absl::InternalError(absl::StrFormat("Failed to serialize message queue: %s", e.what()));
447 }
448}
449
451 char* json_ptr = mq_load_queue(kStorageKey);
452 if (!json_ptr) {
453 // No saved queue, which is fine
454 return absl::OkStatus();
455 }
456
457 try {
458 std::string json_str(json_ptr);
459 free(json_ptr);
460
461 nlohmann::json json_data = nlohmann::json::parse(json_str);
462
463 // Check version compatibility
464 int version = json_data.value("version", 0);
465 if (version != 1) {
466 return absl::InvalidArgumentError(absl::StrFormat("Unsupported queue version: %d", version));
467 }
468
469 std::lock_guard<std::mutex> lock(queue_mutex_);
470
471 // Clear current state
472 queue_.clear();
473 failed_messages_.clear();
474
475 // Load main queue
476 if (json_data.contains("queue")) {
477 for (const auto& msg_json : json_data["queue"]) {
478 QueuedMessage msg;
479 msg.id = msg_json.value("id", "");
480 msg.message_type = msg_json.value("type", "");
481 msg.payload = msg_json.value("payload", "");
482 msg.timestamp = msg_json.value("timestamp", 0.0);
483 msg.retry_count = msg_json.value("retry_count", 0);
484
485 // Skip expired messages
486 double age = GetCurrentTime() - msg.timestamp;
487 if (age <= message_expiry_seconds_) {
488 queue_.push_back(msg);
489 }
490 }
491 }
492
493 // Load failed messages
494 if (json_data.contains("failed")) {
495 for (const auto& msg_json : json_data["failed"]) {
496 QueuedMessage msg;
497 msg.id = msg_json.value("id", "");
498 msg.message_type = msg_json.value("type", "");
499 msg.payload = msg_json.value("payload", "");
500 msg.timestamp = msg_json.value("timestamp", 0.0);
501 msg.retry_count = msg_json.value("retry_count", 0);
502
503 // Keep failed messages for review even if expired
504 failed_messages_.push_back(msg);
505 }
506 }
507
508 // Load statistics
509 if (json_data.contains("stats")) {
510 total_enqueued_ = json_data["stats"].value("total_enqueued", 0);
511 total_replayed_ = json_data["stats"].value("total_replayed", 0);
512 total_failed_ = json_data["stats"].value("total_failed", 0);
513 }
514
515 emscripten_log(EM_LOG_INFO, "Loaded %zu messages from storage (%zu failed)",
516 queue_.size(), failed_messages_.size());
517
518 NotifyStatusChange();
519 return absl::OkStatus();
520
521 } catch (const std::exception& e) {
522 free(json_ptr);
523 return absl::InvalidArgumentError(absl::StrFormat("Failed to parse saved queue: %s", e.what()));
524 }
525}
526
527std::vector<WasmMessageQueue::QueuedMessage> WasmMessageQueue::GetQueuedMessages() const {
528 std::lock_guard<std::mutex> lock(queue_mutex_);
529 return std::vector<QueuedMessage>(queue_.begin(), queue_.end());
530}
531
533 std::lock_guard<std::mutex> lock(queue_mutex_);
534
535 double now = GetCurrentTime();
536 size_t initial_size = queue_.size();
537
538 // Remove expired messages
539 queue_.erase(
540 std::remove_if(queue_.begin(), queue_.end(),
541 [now, this](const QueuedMessage& msg) {
542 return (now - msg.timestamp) > message_expiry_seconds_;
543 }),
544 queue_.end());
545
546 int removed = initial_size - queue_.size();
547
548 if (removed > 0) {
549 NotifyStatusChange();
550 MaybePersist();
551 }
552
553 return removed;
554}
555
556std::string WasmMessageQueue::GenerateMessageId() {
557 static std::random_device rd;
558 static std::mt19937 gen(rd());
559 static std::uniform_int_distribution<> dis(0, 15);
560 static const char* hex_chars = "0123456789abcdef";
561
562 std::stringstream ss;
563 ss << "msg_";
564
565 // Add timestamp component
566 ss << static_cast<long long>(GetCurrentTime() * 1000) << "_";
567
568 // Add random component
569 for (int i = 0; i < 8; i++) {
570 ss << hex_chars[dis(gen)];
571 }
572
573 return ss.str();
574}
575
576size_t WasmMessageQueue::CalculateTotalBytes() const {
577 size_t total = 0;
578
579 for (const auto& msg : queue_) {
580 total += msg.message_type.size();
581 total += msg.payload.size();
582 total += msg.id.size();
583 total += sizeof(msg.timestamp) + sizeof(msg.retry_count);
584 }
585
586 for (const auto& msg : failed_messages_) {
587 total += msg.message_type.size();
588 total += msg.payload.size();
589 total += msg.id.size();
590 total += sizeof(msg.timestamp) + sizeof(msg.retry_count);
591 }
592
593 return total;
594}
595
596void WasmMessageQueue::NotifyStatusChange() {
597 if (status_change_callback_) {
598 status_change_callback_(GetStatus());
599 }
600}
601
602void WasmMessageQueue::MaybePersist() {
603 if (auto_persist_ && !is_replaying_) {
604 auto status = PersistToStorage();
605 if (!status.ok()) {
606 emscripten_log(EM_LOG_WARN, "Failed to auto-persist message queue: %s",
607 status.ToString().c_str());
608 }
609 }
610}
611
612} // namespace platform
613} // namespace app
614} // namespace yaze
615
616#endif // __EMSCRIPTEN__
617// clang-format on
std::vector< QueuedMessage > GetQueuedMessages() const
std::string Enqueue(const std::string &, const std::string &)
EM_JS(void, CallJsAiDriver,(const char *history_json), { if(window.yaze &&window.yaze.ai &&window.yaze.ai.processAgentRequest) { window.yaze.ai.processAgentRequest(UTF8ToString(history_json));} else { console.error("AI Driver not found in window.yaze.ai.processAgentRequest");} })