Skip to content

Instantly share code, notes, and snippets.

@gvoysey
Created October 8, 2015 20:44
Show Gist options
  • Save gvoysey/bbd1d24a4d0136884435 to your computer and use it in GitHub Desktop.
Save gvoysey/bbd1d24a4d0136884435 to your computer and use it in GitHub Desktop.
threaded writer for large files
public class QueuedWriter
{
private byte[] Pw
{
get { return new UnicodeEncoding().GetBytes(UserID.GetSHA1Hash()).Take(16).ToArray(); }
}
public int UserID { get; set; }
private static volatile object _lockObject = new object();
private static QueuedWriter _queuedWriter;
public static QueuedWriter Instance
{
get
{
if (_queuedWriter == null)
{
lock (_lockObject)
{
_queuedWriter = new QueuedWriter();
}
}
return _queuedWriter;
}
}
private QueuedWriter()
{
}
private Dictionary<string, WorkerQueue> _workQueue = new Dictionary<string, WorkerQueue>();
public void Queue(string filePath, string serializedObject)
{
lock (_lockObject)
{
WorkerQueue queue;
if (!_workQueue.TryGetValue(filePath, out queue))
{
queue = new WorkerQueue(filePath, Pw);
_workQueue.Add(filePath, queue);
}
queue.Queue(serializedObject);
}
}
private class WorkerQueue
{
private static void Log(string msg, bool isThisWrong = false)
{
var fullMsg = "DataManager: " + msg;
if (isThisWrong) EngineLog.Warning(fullMsg);
else EngineLog.Print(fullMsg);
}
private Queue<string> _workQueue = new Queue<string>();
private static volatile object _lockObject = new object();
private string _filePath;
private static byte[] _pw;
private Thread _thread;
private static readonly byte[] Iv = {233, 193, 128, 75, 176, 243, 223, 158, 227, 135, 95, 41, 221, 252, 160, 107}; //what a hack.
public WorkerQueue(string filePath, byte[] pw)
{
_filePath = filePath;
_pw = pw;
}
public void Queue(string serializedObject)
{
lock (_lockObject)
{
Log(string.Format("queueing item to be saved in {0}", _filePath));
_workQueue.Enqueue(serializedObject);
if (_thread == null || !_thread.IsAlive)
{
_thread = new Thread(Worker)
{
IsBackground = false
};
_thread.Start();
}
}
}
private void Worker()
{
Log(string.Format("worker {0} starting", _filePath));
while (true)
{
string workItem;
lock (_lockObject)
{
//if we're done, kill the thread.
if (_workQueue.Count == 0)
{
Log(string.Format("worker {0} exiting", _filePath));
return;
}
//throw away everything except the newest
Log(string.Format("worker {0} discarding {1} old items", _filePath, _workQueue.Count - 1));
while (_workQueue.Count > 1) _workQueue.Dequeue();
//get the newest version of the string to save.
workItem = _workQueue.Dequeue();
if (!File.Exists(_filePath)) File.WriteAllText(_filePath, workItem);
else
{
var newfile = Path.ChangeExtension(_filePath, ".new");
var bakfile = Path.ChangeExtension(_filePath, ".bak");
File.WriteAllText(newfile, workItem);
File.Replace(newfile, _filePath, bakfile);
File.Delete(bakfile);
}
}
}
}
private static void Compress(FileInfo fi)
{
// Get the stream of the source file.
using (var inFile = fi.OpenRead())
{
// Prevent compressing hidden and already compressed files.
if ((File.GetAttributes(fi.FullName) & FileAttributes.Hidden) != FileAttributes.Hidden & fi.Extension != ".gz")
{
// Create the compressed file.
using (var outFile = File.Create(fi.FullName + ".gz"))
{
using (var encrypt = new CryptoStream(outFile, Rijndael.Create().CreateEncryptor(_pw, Iv), CryptoStreamMode.Write))
{
using (var compress = new GZipStream(encrypt, CompressionMode.Compress))
{
// Copy the source file into the compression stream.
var buffer = new byte[4096];
int numRead;
while ((numRead = inFile.Read(buffer, 0, buffer.Length)) != 0)
{
compress.Write(buffer, 0, numRead);
}
Log(string.Format("Compressed {0} from {1} to {2} bytes.", fi.Name, fi.Length, outFile.Length));
}
}
}
}
}
//delete the source file, move the compressed file to the source file's name.
File.Delete(fi.FullName);
File.Copy(fi.FullName + ".gz", fi.FullName);
}
private static void Decompress(FileInfo fi)
{
// Get the stream of the source file.
using (FileStream inFile = fi.OpenRead())
{
// Get original file extension, for example "doc" from report.doc.gz.
var curFile = fi.FullName;
var origName = curFile.Remove(curFile.Length - fi.Extension.Length);
//Create the decompressed file.
using (var outFile = File.Create(origName))
{
using (var decrypt = new CryptoStream(inFile, Rijndael.Create().CreateDecryptor(_pw, Iv), CryptoStreamMode.Read))
{
using (var decompress = new GZipStream(decrypt, CompressionMode.Decompress))
{
//Copy the decompression stream into the output file.
var buffer = new byte[4096];
int numRead;
while ((numRead = decompress.Read(buffer, 0, buffer.Length)) != 0)
{
outFile.Write(buffer, 0, numRead);
}
Log(string.Format("Decompressed: {0}", fi.Name));
}
}
}
}
}
private bool _disposed = false;
//Implement IDisposable.
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
// Free other state (managed objects).
if (_thread != null && _thread.IsAlive) _thread.Join(60000);
}
// Free your own state (unmanaged objects).
// Set large fields to null.
_disposed = true;
}
}
// Use C# destructor syntax for finalization code.
~WorkerQueue()
{
// Simply call Dispose(false).
Dispose(false);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment