diff --git a/lib/internal/blob.js b/lib/internal/blob.js index 5059b651f467ca..b1d6449fd959fe 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -403,9 +403,7 @@ function createBlobFromFilePath(path, options) { return lazyDOMException('The blob could not be read', 'NotReadableError'); } const { 0: blob, 1: length } = maybeBlob; - const res = createBlob(blob, length, options?.type); - res[kNotCloneable] = true; - return res; + return createBlob(blob, length, options?.type); } function arrayBuffer(blob) { diff --git a/src/dataqueue/queue.cc b/src/dataqueue/queue.cc index 537844806d3087..ec5a2c1c57063e 100644 --- a/src/dataqueue/queue.cc +++ b/src/dataqueue/queue.cc @@ -33,7 +33,7 @@ class NonIdempotentDataQueueReader; class EntryImpl : public DataQueue::Entry { public: - virtual std::shared_ptr get_reader() = 0; + std::shared_ptr get_reader(Environment* env = nullptr) override = 0; }; class DataQueueImpl final : public DataQueue, @@ -183,7 +183,7 @@ class DataQueueImpl final : public DataQueue, return !backpressure_listeners_.empty(); } - std::shared_ptr get_reader() override; + std::shared_ptr get_reader(Environment* env = nullptr) override; SET_MEMORY_INFO_NAME(DataQueue) SET_SELF_SIZE(DataQueueImpl) @@ -521,7 +521,7 @@ class NonIdempotentDataQueueReader final bool pull_pending_ = false; }; -std::shared_ptr DataQueueImpl::get_reader() { +std::shared_ptr DataQueueImpl::get_reader(Environment* env) { if (is_idempotent()) { return std::make_shared(shared_from_this()); } @@ -573,7 +573,7 @@ class EmptyEntry final : public EntryImpl { EmptyEntry& operator=(const EmptyEntry&) = delete; EmptyEntry& operator=(EmptyEntry&&) = delete; - std::shared_ptr get_reader() override { + std::shared_ptr get_reader(Environment* env) override { return std::make_shared(); } @@ -661,7 +661,7 @@ class InMemoryEntry final : public EntryImpl { InMemoryEntry& operator=(const InMemoryEntry&) = delete; InMemoryEntry& operator=(InMemoryEntry&&) = delete; - std::shared_ptr get_reader() override { + std::shared_ptr get_reader(Environment* env) override { return std::make_shared(*this); } @@ -732,8 +732,8 @@ class DataQueueEntry : public EntryImpl { DataQueueEntry& operator=(const DataQueueEntry&) = delete; DataQueueEntry& operator=(DataQueueEntry&&) = delete; - std::shared_ptr get_reader() override { - return std::make_shared(data_queue_->get_reader()); + std::shared_ptr get_reader(Environment* env) override { + return std::make_shared(data_queue_->get_reader(env)); } std::unique_ptr slice( @@ -844,8 +844,8 @@ class FdEntry final : public EntryImpl { CHECK_LE(start, end); } - std::shared_ptr get_reader() override { - return ReaderImpl::Create(this); + std::shared_ptr get_reader(Environment* env) override { + return ReaderImpl::Create(this, env); } std::unique_ptr slice( @@ -901,7 +901,8 @@ class FdEntry final : public EntryImpl { public StreamListener, public std::enable_shared_from_this { public: - static std::shared_ptr Create(FdEntry* entry) { + static std::shared_ptr Create(FdEntry* entry, + Environment* env) { uv_fs_t req; auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); }); int file = @@ -910,7 +911,8 @@ class FdEntry final : public EntryImpl { uv_fs_close(nullptr, &req, file, nullptr); return nullptr; } - Realm* realm = entry->env()->principal_realm(); + Environment* reader_env = env ? env : entry->env(); + Realm* realm = reader_env->principal_realm(); return std::make_shared( BaseObjectPtr( fs::FileHandle::New(realm->GetBindingData(), diff --git a/src/dataqueue/queue.h b/src/dataqueue/queue.h index a37bd27549986e..4e7f1fc12e47c5 100644 --- a/src/dataqueue/queue.h +++ b/src/dataqueue/queue.h @@ -186,6 +186,10 @@ class DataQueue : public MemoryRetainer { // idempotent and cannot preserve that quality, subsequent reads // must fail with an error when a variance is detected. virtual bool is_idempotent() const = 0; + + // Create a reader for this entry. If `env` is provided, it may be used + // when the entry needs to create native resources in the current realm. + virtual std::shared_ptr get_reader(Environment* env = nullptr) = 0; }; // Creates an idempotent DataQueue with a pre-established collection @@ -228,7 +232,7 @@ class DataQueue : public MemoryRetainer { // any number of readers can be created, all of which are guaranteed // to provide the same data. Otherwise, only a single reader is // permitted. - virtual std::shared_ptr get_reader() = 0; + virtual std::shared_ptr get_reader(Environment* env = nullptr) = 0; // Append a single new entry to the queue. Appending is only allowed // when is_idempotent() is false. std::nullopt will be returned diff --git a/src/node_blob.cc b/src/node_blob.cc index 00deb82f46c322..c406ec81bce984 100644 --- a/src/node_blob.cc +++ b/src/node_blob.cc @@ -14,6 +14,10 @@ #include "v8.h" #include +#include +#include +#include +#include namespace node { @@ -103,6 +107,48 @@ void Concat(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(ArrayBuffer::New(isolate, std::move(store))); } +struct BlobURLEntry { + std::shared_ptr data_queue; + size_t length; + std::string type; +}; + +static std::mutex blob_url_registry_mutex; +static std::unordered_map blob_url_registry; + +static void RevokeBlobURLEntry(const std::string& uuid); + +static void BlobURLCleanupHook(void* arg) { + std::string* uuid = static_cast(arg); + RevokeBlobURLEntry(*uuid); + delete uuid; +} + +static void StoreBlobURLEntry(const std::string& uuid, + std::shared_ptr data_queue, + size_t length, + std::string type, + Environment* env) { + std::lock_guard lock(blob_url_registry_mutex); + blob_url_registry[uuid] = BlobURLEntry{std::move(data_queue), length, + std::move(type)}; + + std::string* uuid_copy = new std::string(uuid); + env->AddCleanupHook(BlobURLCleanupHook, uuid_copy); +} + +static std::optional GetBlobURLEntry(const std::string& uuid) { + std::lock_guard lock(blob_url_registry_mutex); + auto it = blob_url_registry.find(uuid); + if (it == blob_url_registry.end()) return std::nullopt; + return it->second; +} + +static void RevokeBlobURLEntry(const std::string& uuid) { + std::lock_guard lock(blob_url_registry_mutex); + blob_url_registry.erase(uuid); +} + void BlobFromFilePath(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); BufferValue path(env->isolate(), args[0]); @@ -302,7 +348,7 @@ Blob::Reader::Reader(Environment* env, Local obj, BaseObjectPtr strong_ptr) : AsyncWrap(env, obj, AsyncWrap::PROVIDER_BLOBREADER), - inner_(strong_ptr->data_queue_->get_reader()), + inner_(strong_ptr->data_queue_->get_reader(env)), strong_ptr_(std::move(strong_ptr)) { MakeWeak(); } @@ -453,7 +499,6 @@ void Blob::StoreDataObject(const FunctionCallbackInfo& args) { CHECK(args[2]->IsUint32()); // Length CHECK(args[3]->IsString()); // Type - BlobBindingData* binding_data = realm->GetBindingData(); Isolate* isolate = realm->isolate(); Utf8Value key(isolate, args[0]); @@ -463,10 +508,8 @@ void Blob::StoreDataObject(const FunctionCallbackInfo& args) { size_t length = args[2].As()->Value(); Utf8Value type(isolate, args[3]); - binding_data->store_data_object( - key.ToString(), - BlobBindingData::StoredDataObject( - BaseObjectPtr(blob), length, type.ToString())); + StoreBlobURLEntry(key.ToString(), blob->data_queue_, length, + type.ToString(), realm->env()); } // Note: applying the V8 Fast API to the following function does not produce @@ -475,7 +518,6 @@ void Blob::RevokeObjectURL(const FunctionCallbackInfo& args) { CHECK_GE(args.Length(), 1); CHECK(args[0]->IsString()); Realm* realm = Realm::GetCurrent(args); - BlobBindingData* binding_data = realm->GetBindingData(); Isolate* isolate = realm->isolate(); Utf8Value input(isolate, args[0].As()); @@ -492,7 +534,7 @@ void Blob::RevokeObjectURL(const FunctionCallbackInfo& args) { auto end_index = pathname.find(':', start_index + 1); if (end_index == std::string_view::npos) { auto id = std::string(pathname.substr(start_index + 1)); - binding_data->revoke_data_object(id); + RevokeBlobURLEntry(id); } } } @@ -500,29 +542,30 @@ void Blob::RevokeObjectURL(const FunctionCallbackInfo& args) { void Blob::GetDataObject(const FunctionCallbackInfo& args) { CHECK(args[0]->IsString()); Realm* realm = Realm::GetCurrent(args); - BlobBindingData* binding_data = realm->GetBindingData(); Isolate* isolate = realm->isolate(); Utf8Value key(isolate, args[0]); + std::optional stored = GetBlobURLEntry(key.ToString()); + if (!stored.has_value()) return; + + Environment* env = realm->env(); + BaseObjectPtr blob = Blob::Create(env, stored->data_queue); + if (!blob) return; + + Local type; + if (!String::NewFromUtf8(isolate, + stored->type.c_str(), + NewStringType::kNormal, + static_cast(stored->type.length())) + .ToLocal(&type)) { + return; + } - BlobBindingData::StoredDataObject stored = - binding_data->get_data_object(key.ToString()); - if (stored.blob) { - Local type; - if (!String::NewFromUtf8(isolate, - stored.type.c_str(), - NewStringType::kNormal, - static_cast(stored.type.length())) - .ToLocal(&type)) { - return; - } - - Local values[] = {stored.blob->object(), - Uint32::NewFromUnsigned(isolate, stored.length), - type}; + Local values[] = {blob->object(), + Uint32::NewFromUnsigned(isolate, stored->length), + type}; - args.GetReturnValue().Set(Array::New(isolate, values, arraysize(values))); - } + args.GetReturnValue().Set(Array::New(isolate, values, arraysize(values))); } void BlobBindingData::StoredDataObject::MemoryInfo( diff --git a/test/parallel/test-blob-file-backed.js b/test/parallel/test-blob-file-backed.js index f94eae6d6ed760..a6fcdd5f9b054f 100644 --- a/test/parallel/test-blob-file-backed.js +++ b/test/parallel/test-blob-file-backed.js @@ -130,12 +130,9 @@ writeFileSync(testfile5, ''); })().then(common.mustCall()); (async () => { - // We currently do not allow File-backed blobs to be cloned or transferred - // across worker threads. This is largely because the underlying FdEntry - // is bound to the Environment/Realm under which is was created. const blob = await openAsBlob(__filename); - assert.throws(() => structuredClone(blob), { - code: 'ERR_INVALID_STATE', - message: 'Invalid state: File-backed Blobs are not cloneable' - }); + const clone = structuredClone(blob); + assert.strictEqual(clone.size, blob.size); + assert.strictEqual(clone.type, blob.type); + assert.strictEqual(await clone.text(), await blob.text()); })().then(common.mustCall()); diff --git a/test/parallel/test-blob-url-worker.js b/test/parallel/test-blob-url-worker.js new file mode 100644 index 00000000000000..630d578947ccf3 --- /dev/null +++ b/test/parallel/test-blob-url-worker.js @@ -0,0 +1,30 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { isMainThread, parentPort, Worker } = require('worker_threads'); +const { Blob } = require('buffer'); + +if (isMainThread) { + const blob = new Blob(['hello world']); + const url = URL.createObjectURL(blob); + + const worker = new Worker(__filename); + worker.once('message', common.mustCall((value) => { + assert.deepStrictEqual(value, { size: 11, type: '', text: 'hello world' }); + worker.terminate(); + })); + worker.once('error', common.mustNotCall()); + worker.once('exit', common.mustCall((code) => { + assert.strictEqual(code, 0); + })); + + worker.postMessage(url); +} else { + const { resolveObjectURL } = require('buffer'); + parentPort.once('message', async (url) => { + const blob = resolveObjectURL(url); + const text = await blob.text(); + parentPort.postMessage({ size: blob.size, type: blob.type, text }); + }); +}