Concurrent File processing with Golang

Posted on

I was recently asked to add concurrency to a file processing program that used filepath.Walk to discover and process files. This is the solution I can up with.

Naming

The function signature for the Walk func in the standard library is as follows;

func Walk(root string, fn WalkFunc) error

I called mine;

func WalkAsync(path string, fn filepath.WalkFunc) chan error

The solution

package main

import (
    "fmt"
    "os"
    "path/filepath"
)

// args are the arguments that will be sent on a channel and passed to the user specified callback
type args struct {
	path string
	info os.FileInfo
	err  error
}

// WalkAsync asynchronously walks the filesystem and calls the supplied callback function on each.
// path: the directory in which to start the walk
// concurrency: the number of asynchronous workers to use.
// fn: the callback function to call on each file.
func WalkAsync(path string, concurrency int, fn filepath.WalkFunc) chan error {
	// Create a channel to receive file paths
	locationCh := make(chan args)
	errCh := make(chan error)

	// Start a goroutine to walk the file system and send file paths to the channel
	go func() {
		err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				errCh <- err
				return nil
			}
			locationCh <- args{path: path, info: info, err: err}
			return nil
		})
		if err != nil {
			fmt.Println("error walking the path:", err)
		}
		close(locationCh)
		close(errCh)
	}()

	// Start multiple goroutines to process file paths received from the channel
	for i := 0; i < concurrency; i++ {
		go func() {
			for args := range locationCh {
				err := fn(args.path, args.info, args.err)
				if err != nil {
					errCh <- err
				}
			}
		}()
	}

	return errCh
}

Explanation

The function creates a channel to receive the file paths and an error channel to receive errors. It then starts a goroutine to walk the file system and sends the file paths it finds to the locationCh channel as a custom args struct.

Multiple goroutines are then started to process the file paths received from the locationCh channel. The number of goroutines is set to the value of the concurrency argument. Each of these goroutines executes the callback function on the file path, file info, and error it receives from the channel.

If the callback function returns an error, this error is sent to the errCh channel.

Finally, the errCh channel is returned by the function to allow the caller to receive any errors that occurred during the file system walk or the execution of the callback function on the files.

Implementing this function

The main difference between this async func and the non async go std lib func is that the async func returns a channel of errors. Since this function is doing things concurrently we want the ability to return multiple errors.

When using this function we are going to want to loop on this channel and listen to any errors sent. Once this channel has been closed we know that all the work has been done. Here is an example of using this function;

func main() {
    ourCallback := func(path string, info os.FileInfo, err error) error {
        fmt.Println(path)
        return nil
    }
    errCh := WalkAsync("/some/dir", 5, ourCallback)
    
    // This loop will continue until the channel is closed.
    // The channel will close when we have walked the entire file system.
    for err := range errCh {
        if err != nil {
            fmt.Println(err.Error())
        }
    }
}

Final Thoughts

I think that this function strikes the balance between readabiltiy and complexity very well. The sending on a channel and then the go rountines in a for loop is a familiar pattern to many.

The function signature is clear and consise and very similar to the original single threaded solution provided in the golang standard library.