17class CollaborationWithOfflineSupport {
19 CollaborationWithOfflineSupport()
20 : collaboration_(std::make_unique<WasmCollaboration>()),
21 message_queue_(std::make_unique<WasmMessageQueue>()) {
24 message_queue_->SetAutoPersist(
true);
25 message_queue_->SetMaxQueueSize(500);
26 message_queue_->SetMessageExpiry(86400.0);
32 auto status = message_queue_->LoadFromStorage();
34 emscripten_log(EM_LOG_WARN,
"Failed to load offline queue: %s",
35 status.ToString().c_str());
40 void SendChange(uint32_t offset,
const std::vector<uint8_t>& old_data,
41 const std::vector<uint8_t>& new_data) {
42 if (collaboration_->IsConnected()) {
44 auto status = collaboration_->BroadcastChange(offset, old_data, new_data);
48 QueueChange(offset, old_data, new_data);
52 QueueChange(offset, old_data, new_data);
57 void SendCursorPosition(
const std::string& editor_type,
int x,
int y,
int map_id) {
58 if (collaboration_->IsConnected()) {
59 auto status = collaboration_->SendCursorPosition(editor_type, x, y, map_id);
62 QueueCursorPosition(editor_type, x, y, map_id);
65 QueueCursorPosition(editor_type, x, y, map_id);
70 void OnConnectionEstablished() {
71 emscripten_log(EM_LOG_INFO,
"Connection established, replaying queued messages...");
74 auto sender = [
this](
const std::string& message_type,
const std::string& payload) -> absl::Status {
77 nlohmann::json data = nlohmann::json::parse(payload);
79 if (message_type ==
"change") {
80 uint32_t offset = data[
"offset"];
81 std::vector<uint8_t> old_data = data[
"old_data"];
82 std::vector<uint8_t> new_data = data[
"new_data"];
83 return collaboration_->BroadcastChange(offset, old_data, new_data);
84 }
else if (message_type ==
"cursor") {
85 std::string editor_type = data[
"editor_type"];
88 int map_id = data.value(
"map_id", -1);
89 return collaboration_->SendCursorPosition(editor_type, x, y, map_id);
92 return absl::InvalidArgumentError(
"Unknown message type: " + message_type);
93 }
catch (
const std::exception& e) {
94 return absl::InvalidArgumentError(
"Failed to parse payload: " + std::string(e.what()));
99 message_queue_->ReplayAll(sender, 3);
103 WasmMessageQueue::QueueStatus GetQueueStatus()
const {
104 return message_queue_->GetStatus();
109 message_queue_->Clear();
113 void PruneOldMessages() {
114 int removed = message_queue_->PruneExpiredMessages();
116 emscripten_log(EM_LOG_INFO,
"Pruned %d expired messages", removed);
121 void SetupCallbacks() {
123 message_queue_->SetOnReplayComplete([](
int replayed,
int failed) {
124 emscripten_log(EM_LOG_INFO,
"Replay complete: %d sent, %d failed", replayed, failed);
128 if (window.showNotification) {
129 const message = `Synced ${$0} changes` + ($1 > 0 ? `, ${$1} failed` :
'');
130 window.showNotification(message, $1 > 0 ?
'warning' :
'success');
132 }, replayed, failed);
136 message_queue_->SetOnStatusChange([](
const WasmMessageQueue::QueueStatus& status) {
139 if (window.updateQueueStatus) {
140 window.updateQueueStatus({
144 oldestMessageAge: $3,
148 }, status.pending_count, status.failed_count, status.total_bytes,
149 status.oldest_message_age, status.is_persisted);
153 collaboration_->SetStatusCallback([
this](
bool connected,
const std::string& message) {
156 OnConnectionEstablished();
159 emscripten_log(EM_LOG_INFO,
"Connection lost: %s", message.c_str());
164 void QueueChange(uint32_t offset,
const std::vector<uint8_t>& old_data,
165 const std::vector<uint8_t>& new_data) {
166 nlohmann::json payload;
167 payload[
"offset"] = offset;
168 payload[
"old_data"] = old_data;
169 payload[
"new_data"] = new_data;
170 payload[
"timestamp"] = emscripten_get_now() / 1000.0;
172 std::string msg_id = message_queue_->Enqueue(
"change", payload.dump());
173 emscripten_log(EM_LOG_DEBUG,
"Queued change message: %s", msg_id.c_str());
176 void QueueCursorPosition(
const std::string& editor_type,
int x,
int y,
int map_id) {
177 nlohmann::json payload;
178 payload[
"editor_type"] = editor_type;
182 payload[
"map_id"] = map_id;
184 payload[
"timestamp"] = emscripten_get_now() / 1000.0;
186 std::string msg_id = message_queue_->Enqueue(
"cursor", payload.dump());
187 emscripten_log(EM_LOG_DEBUG,
"Queued cursor message: %s", msg_id.c_str());
190 std::unique_ptr<WasmCollaboration> collaboration_;
191 std::unique_ptr<WasmMessageQueue> message_queue_;
199void* create_collaboration_with_offline() {
200 return new CollaborationWithOfflineSupport();
205void send_change_with_queue(
void* instance, uint32_t offset,
206 uint8_t* old_data,
int old_size,
207 uint8_t* new_data,
int new_size) {
208 auto* collab =
static_cast<CollaborationWithOfflineSupport*
>(instance);
209 std::vector<uint8_t> old_vec(old_data, old_data + old_size);
210 std::vector<uint8_t> new_vec(new_data, new_data + new_size);
211 collab->SendChange(offset, old_vec, new_vec);
216int get_pending_message_count(
void* instance) {
217 auto* collab =
static_cast<CollaborationWithOfflineSupport*
>(instance);
218 return collab->GetQueueStatus().pending_count;
223void clear_offline_queue(
void* instance) {
224 auto* collab =
static_cast<CollaborationWithOfflineSupport*
>(instance);
225 collab->ClearQueue();
230void prune_old_messages(
void* instance) {
231 auto* collab =
static_cast<CollaborationWithOfflineSupport*
>(instance);
232 collab->PruneOldMessages();