Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions packages/ndk/lib/domain_layer/usecases/cache_write/cache_write.dart
Original file line number Diff line number Diff line change
@@ -1,30 +1,45 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../../shared/isolates/isolate_manager_io.dart';
import '../../../shared/logger/logger.dart';
import '../../entities/nip_01_event.dart';
import '../../repositories/cache_manager.dart';

/// class to handle writes to cache/db with business logic
class CacheWrite {
final CacheManager cacheManager;
final Duration writeBufferDuration;

CacheWrite(this.cacheManager);
CacheWrite(
this.cacheManager, {
this.writeBufferDuration = const Duration(seconds: 5),
});

/// saves network responses in db and then write to response stream if not already in db (useful to avoid duplicates)
void saveNetworkResponse({
required bool writeToCache,
required Stream<Nip01Event> inputStream,
}) {
inputStream.listen((event) async {
Logger.log.t(() => "⛁ got event from network $event ");

if (writeToCache) {
await cacheManager.saveEvent(event);
}
}, onDone: () {
//? cannot be implemented as stack insert when the stream closes, because it would screw up subscriptions.
}, onError: (error) {
Logger.log.e(() => "⛔ $error ");
});
inputStream
.doOnData((event) {
Logger.log.t(() => "⛁ got event from network $event ");
})
.bufferTime(writeBufferDuration)
.where((events) => writeToCache && events.isNotEmpty)
.listen(
(events) {
Logger.log.t(() =>
"CacheWrite - 💾 Saving batch of ${events.length} events");
cacheManager.saveEvents(events);
},
onDone: () {
Logger.log.t(() => "CacheWrite - ✓ Stream completed");
},
onError: (error) {
Logger.log.e(() => "CacheWrite - ⛔ $error ");
},
);
}
}
52 changes: 52 additions & 0 deletions packages/ndk/lib/simple_profiler.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/// DO NOT USE IN PRODUCTION CODE! Only for debugging and testing purposes.
class SimpleProfiler {
final String name;
final DateTime startTime;
DateTime _lastCheckpoint;

SimpleProfiler(this.name)
: startTime = DateTime.now(),
_lastCheckpoint = DateTime.now() {
// ignore: avoid_print
print('Starting $name at $startTime');
}

void checkpoint(String description) {
final now = DateTime.now();
final totalDuration = now.difference(startTime);
final checkpointDuration = now.difference(_lastCheckpoint);

// ignore: avoid_print
print('$name - $description:'
'\n\t$name Total: ${totalDuration.inMilliseconds}ms'
'\n\t$name Since last checkpoint: ${checkpointDuration.inMilliseconds}ms');

_lastCheckpoint = now;
}

void end() {
final now = DateTime.now();
final totalDuration = now.difference(startTime);
final checkpointDuration = now.difference(_lastCheckpoint);

// ignore: avoid_print
print('Ended $name:'
'\n\t$name Total time: ${totalDuration.inMilliseconds}ms'
'\n\t$name Since last checkpoint: ${checkpointDuration.inMilliseconds}ms');
}
}

/**
// Usage:
Future<void> someFunction() async {
final profiler = SimpleProfiler('MyOperation');

await step1();
profiler.checkpoint('Step 1 completed');

await step2();
profiler.checkpoint('Step 2 completed');

profiler.end();
}
*/
29 changes: 12 additions & 17 deletions packages/objectbox/lib/data_layer/db/object_box/db_object_box.dart
Original file line number Diff line number Diff line change
Expand Up @@ -270,21 +270,15 @@ class DbObjectBox extends WalletsRepo implements CacheManager {
Future<void> saveEvent(Nip01Event event) async {
await dbRdy;
final eventBox = _objectBox.store.box<DbNip01Event>();
final existingEvent = eventBox
.query(DbNip01Event_.nostrId.equals(event.id))
.build()
.findFirst();
if (existingEvent != null) {
eventBox.remove(existingEvent.dbId);
}
eventBox.put(DbNip01Event.fromNdk(event));
await eventBox.putAsync(DbNip01Event.fromNdk(event), mode: PutMode.put);
}

@override
Future<void> saveEvents(List<Nip01Event> events) async {
await dbRdy;
final eventBox = _objectBox.store.box<DbNip01Event>();
eventBox.putMany(events.map((e) => DbNip01Event.fromNdk(e)).toList());
await eventBox
.putManyAsync(events.map((e) => DbNip01Event.fromNdk(e)).toList());
}

@override
Expand All @@ -303,7 +297,7 @@ class DbObjectBox extends WalletsRepo implements CacheManager {
metadata.updatedAt! < existingMetadatas[0].updatedAt!) {
return;
}
metadataBox.put(DbMetadata.fromNdk(metadata));
metadataBox.putAsync(DbMetadata.fromNdk(metadata));
}

@override
Expand Down Expand Up @@ -608,7 +602,7 @@ class DbObjectBox extends WalletsRepo implements CacheManager {
nip05.networkFetchTime! < existing[0].networkFetchTime!) {
return;
}
box.put(DbNip05.fromNdk(nip05));
await box.putAsync(DbNip05.fromNdk(nip05));
}

@override
Expand All @@ -628,7 +622,7 @@ class DbObjectBox extends WalletsRepo implements CacheManager {
if (existing != null) {
box.remove(existing.dbId);
}
box.put(DbRelaySet.fromNdk(relaySet));
await box.putAsync(DbRelaySet.fromNdk(relaySet));
}

@override
Expand All @@ -643,11 +637,12 @@ class DbObjectBox extends WalletsRepo implements CacheManager {
if (existingUserRelayList != null) {
userRelayListBox.remove(existingUserRelayList.dbId);
}
userRelayListBox.put(DbUserRelayList.fromNdk(userRelayList));
await userRelayListBox.putAsync(DbUserRelayList.fromNdk(userRelayList));
}

@override
Future<void> saveUserRelayLists(List<UserRelayList> userRelayLists) async {
await dbRdy;
final wait = <Future>[];
for (final userRelayList in userRelayLists) {
wait.add(saveUserRelayList(userRelayList));
Expand Down Expand Up @@ -714,15 +709,15 @@ class DbObjectBox extends WalletsRepo implements CacheManager {
FilterFetchedRangeRecord record) async {
await dbRdy;
final box = _objectBox.store.box<DbFilterFetchedRangeRecord>();
box.put(DbFilterFetchedRangeRecord.fromNdk(record));
await box.putAsync(DbFilterFetchedRangeRecord.fromNdk(record));
}

@override
Future<void> saveFilterFetchedRangeRecords(
List<FilterFetchedRangeRecord> records) async {
await dbRdy;
final box = _objectBox.store.box<DbFilterFetchedRangeRecord>();
box.putMany(
await box.putManyAsync(
records.map((r) => DbFilterFetchedRangeRecord.fromNdk(r)).toList());
}

Expand Down Expand Up @@ -810,7 +805,7 @@ class DbObjectBox extends WalletsRepo implements CacheManager {
Future<void> removeAllFilterFetchedRangeRecords() async {
await dbRdy;
final box = _objectBox.store.box<DbFilterFetchedRangeRecord>();
box.removeAll();
await box.removeAll();
}

@override
Expand Down Expand Up @@ -911,7 +906,7 @@ class DbObjectBox extends WalletsRepo implements CacheManager {

@override
Future<void> saveKeyset(CahsuKeyset keyset) async {
_objectBox.store.box<DbWalletCahsuKeyset>().put(
await _objectBox.store.box<DbWalletCahsuKeyset>().putAsync(
DbWalletCahsuKeyset.fromNdk(keyset),
);
return Future.value();
Expand Down
Loading
Loading