
Use ticker to periodically load all the files in memory from a path which keeps changing frequently?

I have an application which needs to read files from two different path. After reading all these files, I need to load them up in memory in products map.


Below code watchDeltaPath is called during server startup to watch for delta changes. It will get the delta path from GetDeltaPath method and from that path I need to load all the files in memory. This delta path keeps changing every few minutes and I cannot miss any one delta path and all the files in that path.

Loading all files in memory from loadAllFiles method can take some time (approx 5mins) so I am trying to find a way where I should not miss any new delta path (as it keeps changing every few minutes) and should be able to load all those files in memory from the delta path again and again periodically without any issue and efficiently.

I got the below code which runs every 1 minute and look for new delta path every time and then load all the files from that path in the memory. It works fine but I don't think this is the right approach to do it. What happens if loadAllFiles method takes more than 10 minutes to load all the files in memory and my ticker is running every 1 minute to look for new delta path and then find all the files in that new path and then load up in memory? Will it keep creating lot of background threads and maybe increase cpu-usage by a lot?

type applicationRepository struct {
  client         customer.Client
  logger         log.Logger
  done           chan struct{}
  products       *cmap.ConcurrentMap

// this will be called only once
func (r *applicationRepository) watchDeltaPath() error {
    ticker := time.NewTicker(1 * time.Minute)
    go func() {
        select {
        case <-r.done:
        case <-ticker.C:
            func() (result error) {
                trans := r.logger.StartTransaction(nil, "delta-changes", "")
                defer trans.End()
                defer func() {
                    if result != nil {
                        trans.Errorf("Recovered from error: %v")
                    } else if err := recover(); err != nil {
                        trans.Errorf("Recovered from panic: %v", err)
                // get latest delta path everytime as it keeps changing every few minutes
                path, err := r.client.GetDeltaPath("delta")
                if err != nil {
                    return err
                // load all the files in memory in "products" map from that path
                err = r.loadAllFiles(path)
                if err != nil {
                    return err
                return nil
    return nil

func (r *applicationRepository) Stop() {
    r.done <- struct{}{}

What is the best way to do this efficiently in prod?

Here is my play with code on how it is being executed - https://go.dev/play/p/FS4-B0FWwTe


  • As per the comments the "best way to do this efficiently in prod" depends on a lot of factors and is probably not answerable on a site like Stack overflow. Having said that I can suggest an approach that might make it easier to think about how the problem could be best solved.

    The below code (playground; pretty rough and untested) demonstrates an approach with three go routines:

    1. Detects new delta paths and pushes them to a buffered channel
    2. Handles the initial load
    3. Waits for initial load to finish then applies deltas (note that this does process deltas found while the initial load is underway)

    As mentioned above there is insufficient detail in the question to ascertain whether this a good approach. It may be that the initial load and deltas can run simultaneously without saturating the IO but that would require testing (and would be a relatively small change).

    // Simulation of process to perform initial load and handle deltas
    package main
    import (
    const deltaBuffer = 100
    const initialLoadTime = time.Duration(time.Duration(1.5 * float32(time.Second)))
    const deltaCheckFrequency = time.Duration(500 * time.Millisecond)
    func main() {
        ar := NewApplicationRepository()
        time.Sleep(5 * time.Second)
        fmt.Println(time.Now(), "complete")
    type applicationRepository struct {
        deltaChan       chan string   // Could be some other type...
        initialLoadDone chan struct{} // Closed when initial load finished
        done chan struct{}
        wg   sync.WaitGroup
    func NewApplicationRepository() *applicationRepository {
        ar := applicationRepository{
            deltaChan:       make(chan string, deltaBuffer),
            initialLoadDone: make(chan struct{}),
            done:            make(chan struct{}),
        go ar.detectNewDeltas()
        go ar.initialLoad()
        go ar.deltaLoad()
        return &ar
    // detectNewDeltas - watch for new delta paths
    func (a *applicationRepository) detectNewDeltas() {
        defer a.wg.Done()
        var previousDelta string
        for {
            select {
            case <-time.After(deltaCheckFrequency):
                dp := a.getDeltaPath()
                if dp != previousDelta {
                    select {
                    case a.deltaChan <- dp:
                        panic("channel full - no idea what to do here!")
                    previousDelta = dp
            case <-a.done:
    // getDeltaPath in real application this will retrieve the delta path
    func (a *applicationRepository) getDeltaPath() string {
        return strconv.Itoa(time.Now().Second()) // For now just return the current second..
    // initialLoad - load the initial data
    func (a *applicationRepository) initialLoad() {
        defer a.wg.Done()
        defer close(a.initialLoadDone)
        time.Sleep(initialLoadTime) // Simulate time taken for initial load
    // deltaLoad- load deltas found by detectNewDeltas
    func (a *applicationRepository) deltaLoad() {
        defer a.wg.Done()
        fmt.Println(time.Now(), "deltaLoad started")
        // Wait for initial load to complete before doing anything
        fmt.Println(time.Now(), "Initial Load Done")
        // Wait for incoming deltas and load them
        for {
            select {
            case newDelta := <-a.deltaChan:
                fmt.Println(time.Now(), newDelta)
            case <-a.done:
    // Stop - signal loader to stop and wait until this is done
    func (a *applicationRepository) Stop() {