bulkinsertdirectus

DIRECTUS 1 BULK INSERT when creating X Items instead of (1 INSERT / Item to create) * X


I'm trying to find a way to have Directus executing 1 BULK INSERT in PostgreSQL when dealing with items.create scenario in a Child table instead of 1 INSERT per Item to create (which I see in the DB Logs) = optimize performance, especially when dealing with hundreds or even thousands of items.

Thx for any help.

Context : our Mobile app is calling our API and posting hundred of values > this is triggering a flow. Each of these values is then inserted one at a time (=1 INSERT Statement / item), in a child table of SESSIONS = poor performance.


Solution

  • Directus ItemsService doesn't implement a createMany() based on Knex's batchInsert so there is no way to achieve it simply.

    Trying to disable Activity logging or Revisions is not enough as disabling filters or action triggers.

    I personally needed to send notifications to thousands of users so I transposed the content of ItemsService.createOne() to a customized NotificationsService.createManyAtOnce():

    import { Action } from '@directus/constants'
    import {
      InvalidPayloadError,
    }from '@directus/api/errors/index'
    import { 
      translateDatabaseError,
    } from '@directus/api/database/errors/translate'
    import { SQLError } from '@directus/api/database/errors/dialects/types'
    import getDatabase from '@directus/api/database/index'
    import { getHelpers } from '@directus/api/database/helpers/index'
    import emitter from '@directus/api/emitter'
    import { shouldClearCache } from '@directus/api/utils/should-clear-cache'
    
    
    import { 
      NotificationsService,
      PayloadService,
      AuthorizationService,
    } from '@directus/api/services/index'
    import { 
      type AbstractServiceOptions,
      type MutationOptions,
      type PrimaryKey,
      type Item as AnyItem,
      type ActionEventParams,
    } from '@directus/api/dist/types'
    import type { Notification } from '@directus/types'
    
    import { cloneDeep, pick, without } from 'lodash-es'
    
    
    export class CustomNotificationService extends NotificationsService {
      _parent = NotificationsService.prototype
    
      constructor(options: AbstractServiceOptions) {
        super(options)
      }
    
      async sendEmail(data: Partial<Notification & { sendEmail: boolean }>) {
        if (data.sendEmail === true) {
          this._parent.sendEmail.apply(this, [data])
        }
      }
    
    
      async createManyAtOnce(
        data: Partial<Notification>[], 
        opts: MutationOptions = {}
      ): Promise<PrimaryKey[]> {
        if (! opts.mutationTracker) {
          opts.mutationTracker = this.createMutationTracker()
        }
      
        if (! opts.bypassLimits) {
          opts.mutationTracker.trackMutations(1)
        }
      
        const { ActivityService } = await import(`@directus/api/services/activity`)
        const { RevisionsService } = await import(`@directus/api/services/revisions`)
      
        const primaryKeyField = this.schema.collections[this.collection]!.primary
        const fields = Object.keys(this.schema.collections[this.collection]!.fields)
      
        const aliases = Object.values(this.schema.collections[this.collection]!.fields)
        .filter((field) => field.alias === true)
        .map((field) => field.field)
      
        // By wrapping the logic in a transaction, we make sure we automatically roll back all the
        // changes in the DB if any of the parts contained within throws an error. This also means
        // that any errors thrown in any nested relational changes will bubble up and cancel the whole
        // update tree
        type ItemValues = {
          primaryKey: PrimaryKey,
          payload: typeof data[number],
          payloadAfterHooks: Partial<typeof data[number]>,
          payloadWithPresets: Partial<typeof data[number]>,
          payloadWithoutAliases: Partial<typeof data[number]>,
          
          revisionsM2O: Awaited<ReturnType<PayloadService[`processM2O`]>>[`revisions`],
          revisionsA2O: Awaited<ReturnType<PayloadService[`processA2O`]>>[`revisions`],
          revisionsO2M?: Awaited<ReturnType<PayloadService[`processO2M`]>>[`revisions`],
    
          nestedActionEventsM2O: Awaited<ReturnType<PayloadService[`processM2O`]>>[`nestedActionEvents`],
          nestedActionEventsA2O: Awaited<ReturnType<PayloadService[`processA2O`]>>[`nestedActionEvents`],
          nestedActionEventsO2M?: Awaited<ReturnType<PayloadService[`processO2M`]>>[`nestedActionEvents`],
        }
    
    
        const itemsValues: ItemValues[] = await this.knex.transaction(async (trx) => {
          // We're creating new services instances so they can use the transaction as their Knex interface
          const payloadService = new PayloadService(this.collection, {
            accountability: this.accountability,
            knex: trx,
            schema: this.schema,
          })
      
          const authorizationService = new AuthorizationService({
            accountability: this.accountability,
            knex: trx,
            schema: this.schema,
          })
    
          const itemsAnalyzingValues: ItemValues[] = []
    
          for (const payloadToClone of data) {
            const payload = cloneDeep(payloadToClone)
      
            // Run all hooks that are attached to this event so the end user has the chance to augment the
            // item that is about to be saved
            const payloadAfterHooks =
              opts.emitEvents !== false
                ? await emitter.emitFilter(
                  this.eventScope === `items`
                    ? [`items.create`, `${this.collection}.items.create`]
                    : `${this.eventScope}.create`,
                  payload,
                  {
                    collection: this.collection,
                  },
                  {
                    database: trx,
                    schema: this.schema,
                    accountability: this.accountability,
                  }
                )
                : payload
      
            if (payloadAfterHooks === undefined) {
              throw new InvalidPayloadError({ reason: `Create filter returned undefined` })
            }
      
            if (typeof payloadAfterHooks !== `object`) {
            // primary key(s) or null
              return payloadAfterHooks
            }
      
            const payloadWithPresets = this.accountability
              ? authorizationService.validatePayload(`create`, this.collection, payloadAfterHooks)
              : payloadAfterHooks
      
            if (opts.preMutationError) {
              throw opts.preMutationError
            }
      
            const {
              payload: payloadWithM2O,
              revisions: revisionsM2O,
              nestedActionEvents: nestedActionEventsM2O,
            } = await payloadService.processM2O(payloadWithPresets, opts)
      
            const {
              payload: payloadWithA2O,
              revisions: revisionsA2O,
              nestedActionEvents: nestedActionEventsA2O,
            } = await payloadService.processA2O(payloadWithM2O, opts)
      
            const payloadWithoutAliases = pick(payloadWithA2O, without(fields, ...aliases))
            const payloadWithTypeCasting = await payloadService.processValues(`create`, payloadWithoutAliases)
      
            // In case of manual string / UUID primary keys, the PK already exists in the object we're saving.
            const primaryKey = payloadWithTypeCasting[primaryKeyField]
    
            // batchInsertInput.push(payloadWithoutAliases)
            itemsAnalyzingValues.push({
              primaryKey,
    
              payload,
              payloadAfterHooks,
              payloadWithPresets, 
              payloadWithoutAliases,
    
              revisionsM2O,
              revisionsA2O,
    
              nestedActionEventsM2O,
              nestedActionEventsA2O,
            })
          }
    
          try {
            const results = await trx
            .batchInsert(
              this.collection, 
              itemsAnalyzingValues.map((v) => v.payloadWithoutAliases)
            )
            .returning(primaryKeyField)
    
            results.forEach((result, index) => {
              if (! itemsAnalyzingValues[index]) {
                throw new Error(`No batchInsert itemInput found for index ${index}`)
              }
    
              const returnedKey = typeof result === `object` ? result[primaryKeyField] : result
      
              if (this.schema.collections[this.collection]!.fields[primaryKeyField]!.type === `uuid`) {
                itemsAnalyzingValues[index].primaryKey 
                = getHelpers(trx).schema.formatUUID(
                    (itemsAnalyzingValues[index].primaryKey ?? returnedKey) as string
                  )
              }
              else {
                itemsAnalyzingValues[index].primaryKey 
                = itemsAnalyzingValues[index].primaryKey ?? returnedKey
              }
            })
    
          }
          catch (err: unknown) {
            throw await translateDatabaseError(err as SQLError)
          }
      
          // TODO
          // Most database support returning, those who don't tend to return the PK anyways
          // (MySQL/SQLite). In case the primary key isn't know yet, we'll do a best-attempt at
          // fetching it based on the last inserted row
          // if (! primaryKey) {
          //   // Fetching it with max should be safe, as we're in the context of the current transaction
          //   const result = await trx.max(primaryKeyField, { as: `id` }).from(this.collection)
          //   .first()
          //   primaryKey = result.id
          //   // Set the primary key on the input item, in order for the "after" event hook to be able
          //   // to read from it
          //   payload[primaryKeyField] = primaryKey
          // }
      
          for (const itemsAnalyzingValue of itemsAnalyzingValues) {
            const {
              primaryKey,
              payloadAfterHooks,
              payloadWithPresets, 
              revisionsM2O,
              revisionsA2O,
            } = itemsAnalyzingValue
    
            const { 
              revisions: revisionsO2M, 
              nestedActionEvents: nestedActionEventsO2M, 
            } = await payloadService.processO2M(
              payloadWithPresets,
              primaryKey,
              opts
            )
    
            itemsAnalyzingValue.nestedActionEventsO2M = nestedActionEventsO2M
      
            // If this is an authenticated action, and accountability tracking is enabled, save activity row
            if (this.accountability 
              && this.schema.collections[this.collection]!.accountability !== null
            ) {
              const activityService = new ActivityService({
                knex: trx,
                schema: this.schema,
              })
      
              const activity = await activityService.createOne({
                action: Action.CREATE,
                user: this.accountability!.user,
                collection: this.collection,
                ip: this.accountability!.ip,
                user_agent: this.accountability!.userAgent,
                origin: this.accountability!.origin,
                item: primaryKey,
              })
      
              // If revisions are tracked, create revisions record
              if (this.schema.collections[this.collection]!.accountability === `all`) {
                const revisionsService = new RevisionsService({
                  knex: trx,
                  schema: this.schema,
                })
      
                const revisionDelta = await payloadService.prepareDelta(payloadAfterHooks)
      
                const revision = await revisionsService.createOne({
                  activity: activity,
                  collection: this.collection,
                  item: primaryKey,
                  data: revisionDelta,
                  delta: revisionDelta,
                })
      
                // Make sure to set the parent field of the child-revision rows
                const childrenRevisions = [...revisionsM2O, ...revisionsA2O, ...revisionsO2M]
      
                if (childrenRevisions.length > 0) {
                  await revisionsService.updateMany(childrenRevisions, { parent: revision })
                }
      
                if (opts.onRevisionCreate) {
                  opts.onRevisionCreate(revision)
                }
              }
            }
          }
      
          return itemsAnalyzingValues
        })
      
    
        itemsValues.forEach(({ 
          primaryKey, 
          payload, 
          nestedActionEventsM2O,
          nestedActionEventsA2O,
          nestedActionEventsO2M,
        }) => {
          if (opts.emitEvents !== false) {
            const actionEvent = {
              event:
                this.eventScope === `items`
                  ? [`items.create`, `${this.collection}.items.create`]
                  : `${this.eventScope}.create`,
              meta: {
                payload,
                key: primaryKey,
                collection: this.collection,
              },
              context: {
                database: getDatabase(),
                schema: this.schema,
                accountability: this.accountability,
              },
            }
      
            if (opts.bypassEmitAction) {
              opts.bypassEmitAction(actionEvent)
            }
            else {
              emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context)
            }
    
            const nestedActionEvents = [
              ...nestedActionEventsO2M,
              ...nestedActionEventsA2O,
              ...nestedActionEventsM2O,
            ]
      
            for (const nestedActionEvent of nestedActionEvents) {
              if (opts.bypassEmitAction) {
                opts.bypassEmitAction(nestedActionEvent)
              }
              else {
                emitter.emitAction(
                  nestedActionEvent.event, 
                  nestedActionEvent.meta, 
                  nestedActionEvent.context
                )
              }
            }
          }
        })
    
        if (shouldClearCache(this.cache, opts, this.collection)) {
          await this.cache.clear()
        }
      
        return itemsValues.map(({ primaryKey }) => primaryKey)
      }
    }
    

    With this inserting 1000 rows takes less than 1 second instead of 50s.

    Also beware: