Created
December 18, 2020 07:27
-
-
Save SamVanhoutte/6eb486b4893d5ca2f78010966c5596dd to your computer and use it in GitHub Desktop.
Data lake file transmitter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Azure.Storage.Files.DataLake; | |
using System.IO; | |
using System.Net; | |
using Azure; | |
using Azure.Storage; | |
using Azure.Storage.Files.DataLake.Models; | |
using Azure.Storage.Sas; | |
using ServiceStack; | |
namespace Savanh.Adapters.DataLake | |
{ | |
public abstract class DataLakeTransmitter<T> where T : new() | |
{ | |
private DataLakeTransmitter _dataLakeClient; | |
public DataLakeTransmitter(StorageSettings settings, string directoryName) | |
{ | |
_dataLakeClient = new DataLakeTransmitter(settings, directoryName); | |
} | |
protected async Task<DataLakeDirectoryClient> CreateDirectoryIfNotExists() | |
{ | |
return await _dataLakeClient.CreateDirectoryIfNotExists(); | |
} | |
protected async Task<bool> AppendToFile(string fileName, T record, bool includeHeader = false) | |
{ | |
var recordContent = GetRecord(record, includeHeader); | |
return await _dataLakeClient.AppendToFile(fileName, recordContent); | |
} | |
protected async Task<bool> WriteFile(string fileName, T record) | |
{ | |
var recordContent = GetRecord(record, false); | |
return await _dataLakeClient.WriteText(fileName, recordContent); | |
} | |
private string GetRecord(T record, bool includeHeader) | |
{ | |
var recordText = record.ToCsv(); | |
if (!includeHeader) | |
{ | |
var recordStream = new MemoryStream(Encoding.UTF8.GetBytes(recordText)); | |
var reader = new StreamReader(recordStream); | |
reader.ReadLine(); | |
return reader.ReadToEnd(); | |
} | |
return recordText; | |
} | |
} | |
public class DataLakeTransmitter | |
{ | |
private StorageSettings _settings; | |
private string _directoryName; | |
private DataLakeServiceClient _dataLakeService; | |
private DataLakeFileSystemClient _fileSystemClient; | |
private DataLakeDirectoryClient _directory; | |
private const string FileSystemName = "traiders"; | |
public DataLakeTransmitter(StorageSettings settings, string directoryName) | |
{ | |
_settings = settings; | |
_directoryName = directoryName; | |
} | |
private DataLakeServiceClient DataLakeService | |
{ | |
get | |
{ | |
if (_dataLakeService == null) | |
{ | |
var sharedKeyCredential = | |
new StorageSharedKeyCredential(_settings.AccountName, _settings.AccountKey); | |
string dfsUri = "https://" + _settings.AccountName + ".dfs.core.windows.net"; | |
_dataLakeService = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential); | |
} | |
return _dataLakeService; | |
} | |
} | |
public async Task<bool> CheckFileExist(string fileName) | |
{ | |
var directory = await CreateDirectoryIfNotExists(); | |
var fileClient = directory.GetFileClient(fileName); | |
try | |
{ | |
var fileProperties = (await fileClient.GetPropertiesAsync()).Value; | |
var fileLength = fileProperties.ContentLength; | |
return fileLength > 0; | |
} | |
catch (RequestFailedException ex) | |
{ | |
if (ex.ErrorCode == "BlobNotFound") | |
{ | |
return false; | |
} | |
throw; | |
} | |
} | |
private DataLakeFileSystemClient FileSystem | |
{ | |
get | |
{ | |
if (_fileSystemClient == null) | |
{ | |
try | |
{ | |
_fileSystemClient = DataLakeService.CreateFileSystemAsync(FileSystemName).Result; | |
} | |
catch (AggregateException aex) | |
{ | |
var reqException = aex.InnerException as RequestFailedException; | |
if (reqException != null && reqException.ErrorCode == "ContainerAlreadyExists") | |
{ | |
_fileSystemClient = DataLakeService.GetFileSystemClient(FileSystemName); | |
} | |
} | |
} | |
return _fileSystemClient; | |
} | |
} | |
public async Task<DataLakeDirectoryClient> CreateDirectoryIfNotExists() | |
{ | |
if (_directory == null) | |
{ | |
_directory = FileSystem.GetDirectoryClient(_directoryName); | |
try | |
{ | |
// Check if directory already exists | |
_directory.GetProperties(); | |
} | |
catch (RequestFailedException ex) | |
{ | |
if (ex.ErrorCode == "BlobNotFound") | |
{ | |
_directory = await FileSystem.CreateDirectoryAsync(_directoryName); | |
} | |
} | |
} | |
return _directory; | |
} | |
public async Task<bool> AppendToFile(string fileName, string recordContent) | |
{ | |
long fileLength = 0; | |
var directory = await CreateDirectoryIfNotExists(); | |
var fileClient = directory.GetFileClient(fileName); | |
try | |
{ | |
var fileProperties = (await fileClient.GetPropertiesAsync()).Value; | |
fileLength = fileProperties.ContentLength; | |
} | |
catch (RequestFailedException ex) | |
{ | |
if (ex.ErrorCode == "BlobNotFound") | |
{ | |
fileClient = await directory.CreateFileAsync(fileName); | |
} | |
} | |
long recordSize = recordContent.Length; | |
var recordStream = new MemoryStream(Encoding.UTF8.GetBytes(recordContent)); | |
await fileClient.AppendAsync(recordStream, fileLength); | |
await fileClient.FlushAsync(recordSize + fileLength); | |
return true; | |
} | |
public async Task<bool> WriteText(string fileName, string recordContent, string contentType = null) | |
{ | |
return await WriteBytes(fileName, Encoding.UTF8.GetBytes(recordContent), contentType); | |
} | |
public async Task<bool> WriteBytes(string fileName, byte[] recordContent, string contentType = null) | |
{ | |
var directory = await CreateDirectoryIfNotExists(); | |
DataLakeFileClient fileClient = null; | |
fileClient = (await directory.CreateFileAsync(fileName)).Value; | |
long recordSize = recordContent.Length; | |
var recordStream = new MemoryStream(recordContent); | |
await fileClient.AppendAsync(recordStream, 0); | |
if (!string.IsNullOrEmpty(contentType)) | |
{ | |
await fileClient.FlushAsync(recordSize, | |
httpHeaders: new PathHttpHeaders {ContentType = contentType}); | |
} | |
else | |
{ | |
await fileClient.FlushAsync(recordSize); | |
} | |
return true; | |
} | |
public async Task<Uri> GenerateSasUri(string fileName, int validHours = 1) | |
{ | |
var directory = await CreateDirectoryIfNotExists(); | |
var fileClient = directory.GetFileClient(fileName); | |
var sasBuilder = new AccountSasBuilder | |
{ | |
Protocol = SasProtocol.Https, | |
Services = AccountSasServices.Blobs, | |
ResourceTypes = AccountSasResourceTypes.All, | |
StartsOn = DateTimeOffset.UtcNow.AddHours(-1), | |
ExpiresOn = DateTimeOffset.UtcNow.AddHours(validHours), | |
IPRange = new SasIPRange(IPAddress.None, IPAddress.None) | |
}; | |
// Allow read access | |
sasBuilder.SetPermissions(AccountSasPermissions.Read); | |
// Create a SharedKeyCredential that we can use to sign the SAS token | |
var credential = new StorageSharedKeyCredential(_settings.AccountName, _settings.AccountKey); | |
// Build a SAS URI | |
//https://traiders.blob.core.windows.net/traiders/datainput | |
var sasUri = new UriBuilder(fileClient.Uri) | |
{ | |
Query = sasBuilder.ToSasQueryParameters(credential).ToString() | |
}; | |
return sasUri.Uri; | |
} | |
public async Task<byte[]> ReadFile(string reportName) | |
{ | |
var directory = await CreateDirectoryIfNotExists(); | |
var fileClient = directory.GetFileClient(reportName); | |
var memStream = new MemoryStream(); | |
await fileClient.ReadToAsync(memStream); | |
memStream.Position = 0; | |
return memStream.ToArray(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment