I'm trying to find a way to have Directus executing 1 BULK INSERT in PostgreSQL when dealing with items.create scenario in a Child table instead of 1 INSERT per Item to create (which I see in the DB Logs) = optimize performance, especially when dealing with hundreds or even thousands of items.
Thx for any help.
Context : our Mobile app is calling our API and posting hundred of values > this is triggering a flow. Each of these values is then inserted one at a time (=1 INSERT Statement / item), in a child table of SESSIONS = poor performance.
Directus ItemsService
doesn't implement a createMany()
based on Knex's batchInsert
so there is no way to achieve it simply.
Trying to disable Activity
logging or Revisions
is not enough as disabling filters or action triggers.
I personally needed to send notifications to thousands of users so I transposed the content of ItemsService.createOne() to a customized NotificationsService.createManyAtOnce()
:
import { Action } from '@directus/constants'
import {
InvalidPayloadError,
}from '@directus/api/errors/index'
import {
translateDatabaseError,
} from '@directus/api/database/errors/translate'
import { SQLError } from '@directus/api/database/errors/dialects/types'
import getDatabase from '@directus/api/database/index'
import { getHelpers } from '@directus/api/database/helpers/index'
import emitter from '@directus/api/emitter'
import { shouldClearCache } from '@directus/api/utils/should-clear-cache'
import {
NotificationsService,
PayloadService,
AuthorizationService,
} from '@directus/api/services/index'
import {
type AbstractServiceOptions,
type MutationOptions,
type PrimaryKey,
type Item as AnyItem,
type ActionEventParams,
} from '@directus/api/dist/types'
import type { Notification } from '@directus/types'
import { cloneDeep, pick, without } from 'lodash-es'
export class CustomNotificationService extends NotificationsService {
_parent = NotificationsService.prototype
constructor(options: AbstractServiceOptions) {
super(options)
}
async sendEmail(data: Partial<Notification & { sendEmail: boolean }>) {
if (data.sendEmail === true) {
this._parent.sendEmail.apply(this, [data])
}
}
async createManyAtOnce(
data: Partial<Notification>[],
opts: MutationOptions = {}
): Promise<PrimaryKey[]> {
if (! opts.mutationTracker) {
opts.mutationTracker = this.createMutationTracker()
}
if (! opts.bypassLimits) {
opts.mutationTracker.trackMutations(1)
}
const { ActivityService } = await import(`@directus/api/services/activity`)
const { RevisionsService } = await import(`@directus/api/services/revisions`)
const primaryKeyField = this.schema.collections[this.collection]!.primary
const fields = Object.keys(this.schema.collections[this.collection]!.fields)
const aliases = Object.values(this.schema.collections[this.collection]!.fields)
.filter((field) => field.alias === true)
.map((field) => field.field)
// By wrapping the logic in a transaction, we make sure we automatically roll back all the
// changes in the DB if any of the parts contained within throws an error. This also means
// that any errors thrown in any nested relational changes will bubble up and cancel the whole
// update tree
type ItemValues = {
primaryKey: PrimaryKey,
payload: typeof data[number],
payloadAfterHooks: Partial<typeof data[number]>,
payloadWithPresets: Partial<typeof data[number]>,
payloadWithoutAliases: Partial<typeof data[number]>,
revisionsM2O: Awaited<ReturnType<PayloadService[`processM2O`]>>[`revisions`],
revisionsA2O: Awaited<ReturnType<PayloadService[`processA2O`]>>[`revisions`],
revisionsO2M?: Awaited<ReturnType<PayloadService[`processO2M`]>>[`revisions`],
nestedActionEventsM2O: Awaited<ReturnType<PayloadService[`processM2O`]>>[`nestedActionEvents`],
nestedActionEventsA2O: Awaited<ReturnType<PayloadService[`processA2O`]>>[`nestedActionEvents`],
nestedActionEventsO2M?: Awaited<ReturnType<PayloadService[`processO2M`]>>[`nestedActionEvents`],
}
const itemsValues: ItemValues[] = await this.knex.transaction(async (trx) => {
// We're creating new services instances so they can use the transaction as their Knex interface
const payloadService = new PayloadService(this.collection, {
accountability: this.accountability,
knex: trx,
schema: this.schema,
})
const authorizationService = new AuthorizationService({
accountability: this.accountability,
knex: trx,
schema: this.schema,
})
const itemsAnalyzingValues: ItemValues[] = []
for (const payloadToClone of data) {
const payload = cloneDeep(payloadToClone)
// Run all hooks that are attached to this event so the end user has the chance to augment the
// item that is about to be saved
const payloadAfterHooks =
opts.emitEvents !== false
? await emitter.emitFilter(
this.eventScope === `items`
? [`items.create`, `${this.collection}.items.create`]
: `${this.eventScope}.create`,
payload,
{
collection: this.collection,
},
{
database: trx,
schema: this.schema,
accountability: this.accountability,
}
)
: payload
if (payloadAfterHooks === undefined) {
throw new InvalidPayloadError({ reason: `Create filter returned undefined` })
}
if (typeof payloadAfterHooks !== `object`) {
// primary key(s) or null
return payloadAfterHooks
}
const payloadWithPresets = this.accountability
? authorizationService.validatePayload(`create`, this.collection, payloadAfterHooks)
: payloadAfterHooks
if (opts.preMutationError) {
throw opts.preMutationError
}
const {
payload: payloadWithM2O,
revisions: revisionsM2O,
nestedActionEvents: nestedActionEventsM2O,
} = await payloadService.processM2O(payloadWithPresets, opts)
const {
payload: payloadWithA2O,
revisions: revisionsA2O,
nestedActionEvents: nestedActionEventsA2O,
} = await payloadService.processA2O(payloadWithM2O, opts)
const payloadWithoutAliases = pick(payloadWithA2O, without(fields, ...aliases))
const payloadWithTypeCasting = await payloadService.processValues(`create`, payloadWithoutAliases)
// In case of manual string / UUID primary keys, the PK already exists in the object we're saving.
const primaryKey = payloadWithTypeCasting[primaryKeyField]
// batchInsertInput.push(payloadWithoutAliases)
itemsAnalyzingValues.push({
primaryKey,
payload,
payloadAfterHooks,
payloadWithPresets,
payloadWithoutAliases,
revisionsM2O,
revisionsA2O,
nestedActionEventsM2O,
nestedActionEventsA2O,
})
}
try {
const results = await trx
.batchInsert(
this.collection,
itemsAnalyzingValues.map((v) => v.payloadWithoutAliases)
)
.returning(primaryKeyField)
results.forEach((result, index) => {
if (! itemsAnalyzingValues[index]) {
throw new Error(`No batchInsert itemInput found for index ${index}`)
}
const returnedKey = typeof result === `object` ? result[primaryKeyField] : result
if (this.schema.collections[this.collection]!.fields[primaryKeyField]!.type === `uuid`) {
itemsAnalyzingValues[index].primaryKey
= getHelpers(trx).schema.formatUUID(
(itemsAnalyzingValues[index].primaryKey ?? returnedKey) as string
)
}
else {
itemsAnalyzingValues[index].primaryKey
= itemsAnalyzingValues[index].primaryKey ?? returnedKey
}
})
}
catch (err: unknown) {
throw await translateDatabaseError(err as SQLError)
}
// TODO
// Most database support returning, those who don't tend to return the PK anyways
// (MySQL/SQLite). In case the primary key isn't know yet, we'll do a best-attempt at
// fetching it based on the last inserted row
// if (! primaryKey) {
// // Fetching it with max should be safe, as we're in the context of the current transaction
// const result = await trx.max(primaryKeyField, { as: `id` }).from(this.collection)
// .first()
// primaryKey = result.id
// // Set the primary key on the input item, in order for the "after" event hook to be able
// // to read from it
// payload[primaryKeyField] = primaryKey
// }
for (const itemsAnalyzingValue of itemsAnalyzingValues) {
const {
primaryKey,
payloadAfterHooks,
payloadWithPresets,
revisionsM2O,
revisionsA2O,
} = itemsAnalyzingValue
const {
revisions: revisionsO2M,
nestedActionEvents: nestedActionEventsO2M,
} = await payloadService.processO2M(
payloadWithPresets,
primaryKey,
opts
)
itemsAnalyzingValue.nestedActionEventsO2M = nestedActionEventsO2M
// If this is an authenticated action, and accountability tracking is enabled, save activity row
if (this.accountability
&& this.schema.collections[this.collection]!.accountability !== null
) {
const activityService = new ActivityService({
knex: trx,
schema: this.schema,
})
const activity = await activityService.createOne({
action: Action.CREATE,
user: this.accountability!.user,
collection: this.collection,
ip: this.accountability!.ip,
user_agent: this.accountability!.userAgent,
origin: this.accountability!.origin,
item: primaryKey,
})
// If revisions are tracked, create revisions record
if (this.schema.collections[this.collection]!.accountability === `all`) {
const revisionsService = new RevisionsService({
knex: trx,
schema: this.schema,
})
const revisionDelta = await payloadService.prepareDelta(payloadAfterHooks)
const revision = await revisionsService.createOne({
activity: activity,
collection: this.collection,
item: primaryKey,
data: revisionDelta,
delta: revisionDelta,
})
// Make sure to set the parent field of the child-revision rows
const childrenRevisions = [...revisionsM2O, ...revisionsA2O, ...revisionsO2M]
if (childrenRevisions.length > 0) {
await revisionsService.updateMany(childrenRevisions, { parent: revision })
}
if (opts.onRevisionCreate) {
opts.onRevisionCreate(revision)
}
}
}
}
return itemsAnalyzingValues
})
itemsValues.forEach(({
primaryKey,
payload,
nestedActionEventsM2O,
nestedActionEventsA2O,
nestedActionEventsO2M,
}) => {
if (opts.emitEvents !== false) {
const actionEvent = {
event:
this.eventScope === `items`
? [`items.create`, `${this.collection}.items.create`]
: `${this.eventScope}.create`,
meta: {
payload,
key: primaryKey,
collection: this.collection,
},
context: {
database: getDatabase(),
schema: this.schema,
accountability: this.accountability,
},
}
if (opts.bypassEmitAction) {
opts.bypassEmitAction(actionEvent)
}
else {
emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context)
}
const nestedActionEvents = [
...nestedActionEventsO2M,
...nestedActionEventsA2O,
...nestedActionEventsM2O,
]
for (const nestedActionEvent of nestedActionEvents) {
if (opts.bypassEmitAction) {
opts.bypassEmitAction(nestedActionEvent)
}
else {
emitter.emitAction(
nestedActionEvent.event,
nestedActionEvent.meta,
nestedActionEvent.context
)
}
}
}
})
if (shouldClearCache(this.cache, opts, this.collection)) {
await this.cache.clear()
}
return itemsValues.map(({ primaryKey }) => primaryKey)
}
}
With this inserting 1000 rows takes less than 1 second
instead of 50s.
Also beware:
Directus 10.4.3
so it may have changed quite a lot