Last active
August 21, 2024 06:24
-
-
Save abitofhelp/5ff65b14ac030cffed0cbdeac6ba8449 to your computer and use it in GitHub Desktop.
This gist implements a streaming process where a chunk of data is read from a file in a goroutine, the data is passed through a channel to another goroutine, which writes the data to a file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Copyright (c) 2018 A Bit of Help, Inc. - All Rights Reserved, Worldwide. | |
// Use of this source code is governed by a MIT license that can be found in the LICENSE file. | |
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Package main implements streaming of a file through a buffered channel where it is written to a new file. | |
// A data race condition arose in the for loop that reads data from the input file and sending it to the channel. | |
// I moved the allocation of the buffer to the top of the loop, to resolve the data race issue. | |
// I've created a companion gist,"go-stream-file-between-goroutines-with-pipe" that resolves the race issue using a | |
// FIFO pipe. It worked, but the price was that it was slower than using a channel. | |
// A quick test with a 3.9GB binary file required 5.3s with a channel, and 12.6s with a pipe. | |
// Package main is the entry point for the application and is responsible for configuring the environment. | |
package main | |
import ( | |
"errors" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"sync" | |
"time" | |
) | |
/************************************************* CONSTANTS **********************************************************/ | |
// Const kMaxBufferSize is the maximum number of bytes that will | |
// be read from the file in each iteration through the reading loop. | |
const kMaxBufferSize = 4096 | |
// Const kMaxChannelSize is the number of []byte that can be placed | |
// into the buffered channel. | |
const kMaxChannelSize = 100 | |
// Const kFromPath is the file system path to the file that will be read. | |
const kFromPath = "/home/mjgardner/Downloads/ABigFile.zip" | |
// Const kToPath is the file system path for where the data from the buffered channel will be written. | |
const kToPath = "/home/mjgardner/Downloads/ANewBigFile.zip" | |
/**********************************************************************************************************************/ | |
/**************************************************** MAIN ************************************************************/ | |
// Function configureInterfaces creates and configures the interfaces used by the application, such as the file system | |
// and channel. | |
// Parameter channelSize is the number of []byte that can be placed into the buffered channel. | |
// Parameter fromPath is the file system path to the file that will be read. | |
// Parameter toPath is the file system path to the file that will be created. | |
// Returns a configured channel, input file handle, and output file handle, and no error instance on success; Otherwise, | |
// all items will be nil, except for the error. | |
func configureInterfaces(channelSize uint64, fromPath string, toPath string) (chan []byte, *os.File, *os.File, error) { | |
// Create the channel that will be used by the file reader and file write goroutines. | |
ch := make(chan []byte, channelSize) | |
if ch == nil { | |
err := errors.New("failed to create the channel") | |
log.Fatal(err) | |
return nil, nil, nil, err | |
} | |
// Open the file at fromPath for reading... | |
ifile, err := os.Open(fromPath) | |
if err != nil { | |
log.Fatal(err) | |
return nil, nil, nil, err | |
} | |
// Create the output file at toPath for writing... | |
ofile, err := os.Create(toPath) | |
if err != nil { | |
log.Fatal(err) | |
return nil, nil, nil, err | |
} | |
return ch, ifile, ofile, err | |
} | |
// Function main is the entry point for the application and is responsible for configuring its environment. | |
func main() { | |
// Variable wg is main's WaitGroup, which detects when all of the goroutines that were launched have completed. | |
var wg sync.WaitGroup | |
// Start our timer... | |
start := time.Now() | |
// Create and configure the interfaces used by the application, such as the file system and channel. | |
ch, ifile, ofile, err := configureInterfaces(kMaxChannelSize, kFromPath, kToPath) | |
if err != nil { | |
// Error has already been logged. | |
return | |
} | |
// Automatically close the files when exiting. | |
defer ifile.Close() | |
defer ofile.Close() | |
fmt.Println("Starting processing...\n") | |
// File writing goroutine. | |
wg.Add(1) | |
go func(ch <-chan []byte, of *os.File) { | |
defer wg.Done() | |
// Receive the input file's data through the buffered channel. | |
receiver(ch, of) | |
}(ch, ofile) | |
// File reading goroutine | |
wg.Add(1) | |
go func(ch chan<- []byte, ifile *os.File) { | |
defer wg.Done() | |
// Write the input file's data to the buffered channel. | |
sender(ifile, ch) | |
}(ch, ifile) | |
// Wait here until all goroutines have completed their work. | |
wg.Wait() | |
// Show the duration. | |
fmt.Printf("\nDone processing...\nElapsed: %s", time.Since(start)) | |
} | |
// Function sender write the input file's data to the buffered channel. | |
// Parameter if is the input file's handle. | |
// Parameter ch is the buffered channel to which data will be written. | |
func sender(ifile *os.File, ch chan<- []byte) error { | |
// Cumulative counters | |
nBytes := uint64(0) | |
nChunks := uint64(0) | |
fmt.Println("\tSender is starting to read the input file...") | |
// Loop through the input file reading chunks of data, which is sent over the channel. | |
for { | |
// The buffer for data that is read from the file. It is created on each | |
// cycle through the loop to avoid a race condition with ifile.Read(). | |
buf := make([]byte, kMaxBufferSize) | |
// Read a chunk of data from the file... | |
n, err := ifile.Read(buf[:cap(buf)]) | |
// Did we read any data from the file? Was there an error? | |
if n == 0 { | |
if err == nil { | |
// No data and no error; Keep going... | |
continue | |
} | |
if err == io.EOF { | |
// End of file, so exit the loop... | |
break | |
} | |
// Ouch! Log the error and exit. | |
log.Fatal(err) | |
return err | |
} | |
// Update the cumulative counters. | |
nChunks++ | |
nBytes += uint64(n) | |
// Send the data over the channel. | |
ch <- buf[:n] | |
} | |
// Signal the receiving goroutines that there is no more data. | |
fmt.Println("\tSender is closing the channel to signal the receiver that no more data is coming, and exiting...") | |
close(ch) | |
// When there is no more data to process, display the sender's status. | |
fmt.Printf("\tSent:\t\tnBytes: %d, nChunks: %d\n", nBytes, nChunks) | |
return nil | |
} | |
// Receive the input file's data through the buffered channel. | |
// Parameter of is the output file's handle. | |
// Parameter ch is the buffered channel from which data will be read. | |
func receiver(ch <-chan []byte, of *os.File) { | |
// Cumulative counters | |
nBytes := uint64(0) | |
nChunks := uint64(0) | |
fmt.Println("\tReceiver is waiting for data in the channel, so it can write it to the output file...") | |
// While there is data to read in the channel, we will get it and writing it to the output file. | |
for data := range ch { | |
// Determine the length of the chunk of data that is available. | |
n := len(data) | |
// Write the chunk to the output file. | |
of.Write(data[:n]) | |
// Update the cumulative counters... | |
nBytes += uint64(n) | |
nChunks++ | |
} | |
fmt.Println("\tReceiver has emptied the channel and is exiting...") | |
// When there is no more data to process, display the receiver's status. | |
fmt.Printf("\tReceived:\tnBytes: %d, nChunks: %d\n", nBytes, nChunks) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment