yaze 0.3.2
Link to the Past ROM Editor
 
Loading...
Searching...
No Matches
wasm_message_queue_usage.cc
Go to the documentation of this file.
1// Example: Using WasmMessageQueue with WasmCollaboration
2// This example shows how the collaboration system can use the message queue
3// for offline support and automatic replay when reconnecting.
4
5#ifdef __EMSCRIPTEN__
6
9#include <emscripten.h>
10#include <memory>
11
12namespace yaze {
13namespace app {
14namespace platform {
15
16// Example integration class that combines collaboration with offline queue
17class CollaborationWithOfflineSupport {
18 public:
19 CollaborationWithOfflineSupport()
20 : collaboration_(std::make_unique<WasmCollaboration>()),
21 message_queue_(std::make_unique<WasmMessageQueue>()) {
22
23 // Configure message queue
24 message_queue_->SetAutoPersist(true);
25 message_queue_->SetMaxQueueSize(500);
26 message_queue_->SetMessageExpiry(86400.0); // 24 hours
27
28 // Set up callbacks
29 SetupCallbacks();
30
31 // Load any previously queued messages
32 auto status = message_queue_->LoadFromStorage();
33 if (!status.ok()) {
34 emscripten_log(EM_LOG_WARN, "Failed to load offline queue: %s",
35 status.ToString().c_str());
36 }
37 }
38
39 // Send a change, queuing if offline
40 void SendChange(uint32_t offset, const std::vector<uint8_t>& old_data,
41 const std::vector<uint8_t>& new_data) {
42 if (collaboration_->IsConnected()) {
43 // Try to send directly
44 auto status = collaboration_->BroadcastChange(offset, old_data, new_data);
45
46 if (!status.ok()) {
47 // Failed to send, queue for later
48 QueueChange(offset, old_data, new_data);
49 }
50 } else {
51 // Not connected, queue for later
52 QueueChange(offset, old_data, new_data);
53 }
54 }
55
56 // Send cursor position, queuing if offline
57 void SendCursorPosition(const std::string& editor_type, int x, int y, int map_id) {
58 if (collaboration_->IsConnected()) {
59 auto status = collaboration_->SendCursorPosition(editor_type, x, y, map_id);
60
61 if (!status.ok()) {
62 QueueCursorPosition(editor_type, x, y, map_id);
63 }
64 } else {
65 QueueCursorPosition(editor_type, x, y, map_id);
66 }
67 }
68
69 // Called when connection is established
70 void OnConnectionEstablished() {
71 emscripten_log(EM_LOG_INFO, "Connection established, replaying queued messages...");
72
73 // Create sender function that uses the collaboration instance
74 auto sender = [this](const std::string& message_type, const std::string& payload) -> absl::Status {
75 // Parse the payload and send via collaboration
76 try {
77 nlohmann::json data = nlohmann::json::parse(payload);
78
79 if (message_type == "change") {
80 uint32_t offset = data["offset"];
81 std::vector<uint8_t> old_data = data["old_data"];
82 std::vector<uint8_t> new_data = data["new_data"];
83 return collaboration_->BroadcastChange(offset, old_data, new_data);
84 } else if (message_type == "cursor") {
85 std::string editor_type = data["editor_type"];
86 int x = data["x"];
87 int y = data["y"];
88 int map_id = data.value("map_id", -1);
89 return collaboration_->SendCursorPosition(editor_type, x, y, map_id);
90 }
91
92 return absl::InvalidArgumentError("Unknown message type: " + message_type);
93 } catch (const std::exception& e) {
94 return absl::InvalidArgumentError("Failed to parse payload: " + std::string(e.what()));
95 }
96 };
97
98 // Replay all queued messages
99 message_queue_->ReplayAll(sender, 3); // Max 3 retries per message
100 }
101
102 // Get queue status for UI display
103 WasmMessageQueue::QueueStatus GetQueueStatus() const {
104 return message_queue_->GetStatus();
105 }
106
107 // Clear all queued messages
108 void ClearQueue() {
109 message_queue_->Clear();
110 }
111
112 // Prune old messages
113 void PruneOldMessages() {
114 int removed = message_queue_->PruneExpiredMessages();
115 if (removed > 0) {
116 emscripten_log(EM_LOG_INFO, "Pruned %d expired messages", removed);
117 }
118 }
119
120 private:
121 void SetupCallbacks() {
122 // Set up replay complete callback
123 message_queue_->SetOnReplayComplete([](int replayed, int failed) {
124 emscripten_log(EM_LOG_INFO, "Replay complete: %d sent, %d failed", replayed, failed);
125
126 // Show notification to user
127 EM_ASM({
128 if (window.showNotification) {
129 const message = `Synced ${$0} changes` + ($1 > 0 ? `, ${$1} failed` : '');
130 window.showNotification(message, $1 > 0 ? 'warning' : 'success');
131 }
132 }, replayed, failed);
133 });
134
135 // Set up status change callback
136 message_queue_->SetOnStatusChange([](const WasmMessageQueue::QueueStatus& status) {
137 // Update UI with queue status
138 EM_ASM({
139 if (window.updateQueueStatus) {
140 window.updateQueueStatus({
141 pendingCount: $0,
142 failedCount: $1,
143 totalBytes: $2,
144 oldestMessageAge: $3,
145 isPersisted: $4
146 });
147 }
148 }, status.pending_count, status.failed_count, status.total_bytes,
149 status.oldest_message_age, status.is_persisted);
150 });
151
152 // Set up collaboration status callback
153 collaboration_->SetStatusCallback([this](bool connected, const std::string& message) {
154 if (connected) {
155 // Connection established, replay queued messages
156 OnConnectionEstablished();
157 } else {
158 // Connection lost
159 emscripten_log(EM_LOG_INFO, "Connection lost: %s", message.c_str());
160 }
161 });
162 }
163
164 void QueueChange(uint32_t offset, const std::vector<uint8_t>& old_data,
165 const std::vector<uint8_t>& new_data) {
166 nlohmann::json payload;
167 payload["offset"] = offset;
168 payload["old_data"] = old_data;
169 payload["new_data"] = new_data;
170 payload["timestamp"] = emscripten_get_now() / 1000.0;
171
172 std::string msg_id = message_queue_->Enqueue("change", payload.dump());
173 emscripten_log(EM_LOG_DEBUG, "Queued change message: %s", msg_id.c_str());
174 }
175
176 void QueueCursorPosition(const std::string& editor_type, int x, int y, int map_id) {
177 nlohmann::json payload;
178 payload["editor_type"] = editor_type;
179 payload["x"] = x;
180 payload["y"] = y;
181 if (map_id >= 0) {
182 payload["map_id"] = map_id;
183 }
184 payload["timestamp"] = emscripten_get_now() / 1000.0;
185
186 std::string msg_id = message_queue_->Enqueue("cursor", payload.dump());
187 emscripten_log(EM_LOG_DEBUG, "Queued cursor message: %s", msg_id.c_str());
188 }
189
190 std::unique_ptr<WasmCollaboration> collaboration_;
191 std::unique_ptr<WasmMessageQueue> message_queue_;
192};
193
194// JavaScript bindings for the enhanced collaboration
195extern "C" {
196
197// Create collaboration instance with offline support
198EMSCRIPTEN_KEEPALIVE
199void* create_collaboration_with_offline() {
200 return new CollaborationWithOfflineSupport();
201}
202
203// Send a change (with automatic queuing if offline)
204EMSCRIPTEN_KEEPALIVE
205void send_change_with_queue(void* instance, uint32_t offset,
206 uint8_t* old_data, int old_size,
207 uint8_t* new_data, int new_size) {
208 auto* collab = static_cast<CollaborationWithOfflineSupport*>(instance);
209 std::vector<uint8_t> old_vec(old_data, old_data + old_size);
210 std::vector<uint8_t> new_vec(new_data, new_data + new_size);
211 collab->SendChange(offset, old_vec, new_vec);
212}
213
214// Get queue status
215EMSCRIPTEN_KEEPALIVE
216int get_pending_message_count(void* instance) {
217 auto* collab = static_cast<CollaborationWithOfflineSupport*>(instance);
218 return collab->GetQueueStatus().pending_count;
219}
220
221// Clear offline queue
222EMSCRIPTEN_KEEPALIVE
223void clear_offline_queue(void* instance) {
224 auto* collab = static_cast<CollaborationWithOfflineSupport*>(instance);
225 collab->ClearQueue();
226}
227
228// Prune old messages
229EMSCRIPTEN_KEEPALIVE
230void prune_old_messages(void* instance) {
231 auto* collab = static_cast<CollaborationWithOfflineSupport*>(instance);
232 collab->PruneOldMessages();
233}
234
235} // extern "C"
236
237} // namespace platform
238} // namespace app
239} // namespace yaze
240
241#endif // __EMSCRIPTEN__