From 4ddb8dc44d1fd46d8225d0f0307ace1f93abde5b Mon Sep 17 00:00:00 2001 From: Nathan McRae Date: Sun, 25 Feb 2024 11:24:30 -0800 Subject: [PATCH] Add parallel parsing/serialization for Simple TSV Doesn't give as much of a performance bonus as hoped --- SaneTsv/SaneTsv.cs | 378 ++++++++++++++++++++++++++++++++- SaneTsv/SaneTsvTest/Program.cs | 168 ++++++++++++++- 2 files changed, 537 insertions(+), 9 deletions(-) diff --git a/SaneTsv/SaneTsv.cs b/SaneTsv/SaneTsv.cs index b3bb571..71bef74 100644 --- a/SaneTsv/SaneTsv.cs +++ b/SaneTsv/SaneTsv.cs @@ -644,6 +644,146 @@ public class SaneTsv return Encoding.UTF8.GetBytes(escapedString.ToString()); } + public static byte[] SerializeSimpleTsvParallel(IList header, IList> data) + { + var serialized = new List(); + var escapedString = new StringBuilder(); + + // Serialize header + for (int i = 0; i < header.Count; i++) + { + if (header[i].Contains(':')) + { + throw new Exception($"Column {i} contains the character ':'"); + } + + for (int j = i + 1; j < header.Count; j++) + { + if (header[i] == header[j]) + { + throw new Exception("Column names in header must be unique"); + } + } + + for (int j = 0; j < header[i].Count(); j++) + { + if (header[i][j] == '\n') + { + escapedString.Append("\\n"); + } + else if (header[i][j] == '\t') + { + escapedString.Append("\\t"); + } + else if (header[i][j] == '\\') + { + escapedString.Append("\\\\"); + } + else if (header[i][j] == '#') + { + escapedString.Append("\\#"); + } + else + { + escapedString.Append(header[i][j]); + } + } + + if (i == header.Count - 1) + { + escapedString.Append('\n'); + } + else + { + escapedString.Append('\t'); + } + } + + serialized.AddRange(Encoding.UTF8.GetBytes(escapedString.ToString())); + + // TODO: need to figure out where the crossover it + // Complication: it probably depends on processor count + if (data.Count < 100) + { + serialized.AddRange(Encoding.UTF8.GetBytes(SerializeSimpleTsvParallel(data, 0, data.Count))); + } + else + { + int tasks = Environment.ProcessorCount - 1; + int splitCount = data.Count / tasks; + byte[][] bytes = new byte[tasks][]; + Parallel.For(0, tasks, i => + { + int endIndex; + if (i == tasks - 1) + { + endIndex = data.Count; + } + else + { + endIndex = (i + 1) * splitCount; + } + string escapedString = SerializeSimpleTsvParallel(data, i * splitCount, endIndex); + bytes[i] = Encoding.UTF8.GetBytes(escapedString); + }); + + for (int i = 0; i < tasks; i++) + { + serialized.AddRange(bytes[i]); + } + } + + return serialized.ToArray(); + } + + public static string SerializeSimpleTsvParallel(IList> data, int startIndex, int endIndex) + { + var escapedString = new StringBuilder(); + + // Serialize data + for (int i = startIndex; i < endIndex; i++) + { + for (int j = 0; j < data[i].Count; j++) + { + for (int k = 0; k < data[i][j].Length; k++) + { + if (data[i][j][k] == '\n') + { + escapedString.Append("\\n"); + } + else if (data[i][j][k] == '\t') + { + escapedString.Append("\\t"); + } + else if (data[i][j][k] == '\\') + { + escapedString.Append("\\\\"); + } + else if (data[i][j][k] == '#') + { + escapedString.Append("\\#"); + } + else + { + escapedString.Append(data[i][j][k]); + } + } + + if (j < data[i].Count - 1) + { + escapedString.Append('\t'); + } + else if (i < data.Count - 1) + { + escapedString.Append('\n'); + } + } + } + + return escapedString.ToString(); + } + + public static (string[] columns, string[][] data) ParseSimpleTsv(byte[] inputBuffer) { string[] columnNames = null; @@ -795,6 +935,225 @@ public class SaneTsv return (columnNames, records.ToArray()); } + public static (string[] columns, string[][] data) ParseSimpleTsvParallel(byte[] inputBuffer) + { + string[] columnNames = null; + var headers = new List(); + var fieldBytes = new List(); + int startOfData = -1; + for (int i = 0; i < inputBuffer.Count(); i++) + { + if (inputBuffer[i] == '\\') + { + if (i + 1 == inputBuffer.Count()) + { + throw new Exception($"Found '\\' at end of input"); + } + if (inputBuffer[i + 1] == 'n') + { + fieldBytes.Add((byte)'\n'); + i++; + } + else if (inputBuffer[i + 1] == '\\') + { + fieldBytes.Add((byte)'\\'); + i++; + } + else if (inputBuffer[i + 1] == 't') + { + fieldBytes.Add((byte)'\t'); + i++; + } + else if (inputBuffer[i + 1] == '#') + { + fieldBytes.Add((byte)'#'); + i++; + } + else + { + throw new Exception($"Expected 'n', 't', '#', or '\\' after '\\' at line {1} column {i}"); + } + } + else if (inputBuffer[i] == '\t') + { + // end of field + headers.Add(fieldBytes.ToArray()); + fieldBytes.Clear(); + } + else if (inputBuffer[i] == '\n') + { + // This is the end of the header + headers.Add(fieldBytes.ToArray()); + startOfData = i + 1; + + columnNames = new string[headers.Count]; + fieldBytes.Clear(); + + for (int j = 0; j < headers.Count; j++) + { + string columnString; + try + { + columnString = Encoding.UTF8.GetString(headers[j]); + } + catch (Exception e) + { + throw new Exception($"Column {headers.Count} name is not valid UTF-8", e); + } + + if (columnString.Contains(':')) + { + throw new Exception($"Header {headers.Count} contain ':', which is not allowed for column names"); + } + + columnNames[j] = columnString; + } + + // Done parsing header + break; + } + else if (inputBuffer[i] == '#') + { + throw new Exception($"Found unescaped '#' at line 1, column {i}"); + } + else + { + fieldBytes.Add(inputBuffer[i]); + } + } + + return (columnNames, ParseSimpleTsvParallel(inputBuffer, columnNames.Length, startOfData, inputBuffer.Length)); + } + + public static string[][] ParseSimpleTsvParallel(byte[] inputBuffer, int numFields, int startIndex, int endIndex) + { + var fieldBytes = new List(); + var fields = new List(); + var records = new List(); + + int line = 1; + int currentLineStart = 0; + + // Go back to the start of the current line + int i = startIndex; + while (inputBuffer[i] != '\n') + { + i--; + } + + // We want to start at the first byte of the current line + i++; + + for (; i < endIndex; i++) + { + if (inputBuffer[i] == '\\') + { + if (i + 1 == inputBuffer.Count()) + { + throw new Exception($"Found '\\' at end of input"); + } + if (inputBuffer[i + 1] == 'n') + { + fieldBytes.Add((byte)'\n'); + i++; + } + else if (inputBuffer[i + 1] == '\\') + { + fieldBytes.Add((byte)'\\'); + i++; + } + else if (inputBuffer[i + 1] == 't') + { + fieldBytes.Add((byte)'\t'); + i++; + } + else if (inputBuffer[i + 1] == '#') + { + fieldBytes.Add((byte)'#'); + i++; + } + else + { + throw new Exception($"Expected 'n', 't', '#', or '\\' after '\\' at line {line} column {i - currentLineStart}"); + } + } + else if (inputBuffer[i] == '\t') + { + // end of field + fields.Add(fieldBytes.ToArray()); + fieldBytes.Clear(); + } + else if (inputBuffer[i] == '\n') + { + fields.Add(fieldBytes.ToArray()); + fieldBytes.Clear(); + + if (numFields != fields.Count) + { + throw new Exception($"Expected {numFields} fields on line {line}, but found {fields.Count}"); + } + else + { + var fieldStrings = new string[fields.Count]; + for (int j = 0; j < fields.Count; j++) + { + try + { + fieldStrings[j] = Encoding.UTF8.GetString(fields[j]); + } + catch (Exception e) + { + throw new Exception($"Line {line}, column {j} is not valid UTF-8", e); + } + } + records.Add(fieldStrings); + fields.Clear(); + } + + line++; + currentLineStart = i + 1; + } + else if (inputBuffer[i] == '#') + { + throw new Exception($"Found unescaped '#' at line {line}, column {i - currentLineStart}"); + } + else + { + fieldBytes.Add(inputBuffer[i]); + } + } + + fields.Add(fieldBytes.ToArray()); + + if (fields.Count == 0) + { + throw new Exception("Found 0 fields on last line. Possibly because of extra \\n after last record"); + } + if (numFields != fields.Count) + { + throw new Exception($"Expected {numFields} fields on line {line}, but found {fields.Count}"); + } + else + { + var fieldStrings = new string[fields.Count]; + for (int j = 0; j < fields.Count; j++) + { + try + { + fieldStrings[j] = Encoding.UTF8.GetString(fields[j]); + } + catch (Exception e) + { + throw new Exception($"Line {line}, column {j} is not valid UTF-8", e); + } + } + records.Add(fieldStrings); + fields.Clear(); + } + + return records.ToArray(); + } + public static Type GetColumnFromType(Type type) { if (type == typeof(string)) @@ -893,7 +1252,7 @@ public class SaneTsv public static byte[] SerializeSimpleTsv(IList data) where T : TsvRecord { - return SerializeTsv(data, FormatType.TYPED_TSV); + return SerializeTsv(data, FormatType.SIMPLE_TSV); } public static byte[] SerializeTypedTsv(IList data) where T : TsvRecord @@ -977,14 +1336,17 @@ public class SaneTsv } } - bytes.Add((byte)':'); - try + if (tsvFormat != FormatType.SIMPLE_TSV) { - bytes.AddRange(Encoding.UTF8.GetBytes(GetNameFromColumn(headerTypes[i]))); - } - catch (Exception e) - { - throw new Exception($"Invalid header type for column {i}", e); + bytes.Add((byte)':'); + try + { + bytes.AddRange(Encoding.UTF8.GetBytes(GetNameFromColumn(headerTypes[i]))); + } + catch (Exception e) + { + throw new Exception($"Invalid header type for column {i}", e); + } } if (i == headerNames.Count - 1) diff --git a/SaneTsv/SaneTsvTest/Program.cs b/SaneTsv/SaneTsvTest/Program.cs index d117329..b3bfd40 100644 --- a/SaneTsv/SaneTsvTest/Program.cs +++ b/SaneTsv/SaneTsvTest/Program.cs @@ -101,6 +101,18 @@ internal class Program public double BinFloat { get; set; } } + public class StringTestRecord : SaneTsv.TsvRecord + { + [SaneTsv.TypedTsvColumn("column1")] + public string Column1 { get; set; } + + [SaneTsv.TypedTsvColumn] + public string column2 { get; set; } + + [SaneTsv.TypedTsvColumn("columnthree\nyep")] + public string Column3 { get; set; } + } + private static void Main(string[] args) { { @@ -275,7 +287,7 @@ internal class Program { string testName = "Try to parsed a Typed TSV as a Simple TSV"; - string testString1 = + string testString1 = "column1:type:boolean\tcolumn2:binary\tcolumnthree\\nyep:string" + "\nTRUE\tvalue\\\\t\0woo\tvaluetrhee" + "\nFALSE\tnother\tno\\ther"; @@ -292,6 +304,160 @@ internal class Program } } + { + string testName = "Timing comparison of simple parse methods and comparison of simple serialization methods"; + + int N = 1000000; + var records = new StringTestRecord[N]; + var rand = new Random(1); + + for (int i = 0; i < N; i++) + { + records[i] = new StringTestRecord() + { + Column1 = rand.Next().ToString(), + column2 = rand.Next().ToString(), + Column3 = rand.Next().ToString(), + }; + } + + string[][] recordStrings = records.Select(record => new string[] { record.Column1, record.column2, record.Column3 }).ToArray(); + + DateTime lastTime = DateTime.Now; + byte[] serialized1 = SaneTsv.SerializeSimpleTsv(records); + + TimeSpan speccedSerializationTime = DateTime.Now - lastTime; + Console.WriteLine($"Specced serialization time: {speccedSerializationTime}"); + lastTime = DateTime.Now; + + byte[] serialized2 = SaneTsv.SerializeSimpleTsv(new string[] { "column1", "column2", "columnthree\nyep" }, recordStrings); + + TimeSpan unspeccedSerializationTime = DateTime.Now - lastTime; + Console.WriteLine($"Unspecced serialization time: {unspeccedSerializationTime}"); + lastTime = DateTime.Now; + + Tsv parsed = SaneTsv.ParseSimpleTsv(serialized1); + + TimeSpan speccedParseTime = DateTime.Now - lastTime; + Console.WriteLine($"Specced parse time: {speccedParseTime}"); + lastTime = DateTime.Now; + + (string[] columns, string[][] data) = SaneTsv.ParseSimpleTsv(serialized2); + + TimeSpan unspeccedParseTime = DateTime.Now - lastTime; + Console.WriteLine($"Unspecced parse time: {unspeccedParseTime}"); + } + + { + string testName = "Check parallel serialization"; + + int N = 100000; + var records = new StringTestRecord[N]; + var rand = new Random(1); + + for (int i = 0; i < N; i++) + { + records[i] = new StringTestRecord() + { + Column1 = rand.Next().ToString(), + column2 = rand.Next().ToString(), + Column3 = rand.Next().ToString(), + }; + } + + string[][] recordStrings = records.Select(record => new string[] { record.Column1, record.column2, record.Column3 }).ToArray(); + + DateTime lastTime = DateTime.Now; + byte[] serialized1 = SaneTsv.SerializeSimpleTsv(new string[] { "column1", "column2", "columnthree\nyep" }, recordStrings); + TimeSpan unparallelTime = DateTime.Now - lastTime; + lastTime = DateTime.Now; + byte[] serialized2 = SaneTsv.SerializeSimpleTsvParallel(new string[] { "column1", "column2", "columnthree\nyep" }, recordStrings); + TimeSpan parallelTime = DateTime.Now - lastTime; + + Console.WriteLine($"Unparallel serialization time: {unparallelTime}"); + Console.WriteLine($"Parallel serialization time: {parallelTime}"); + + bool matching = true; + for (int i = 0; i < Math.Min(serialized1.Length, serialized2.Length); i++) + { + if (serialized1[i] != serialized2[i]) + { + matching = false; + break; + } + } + + if (matching) + { + Console.WriteLine($"Passed {testName}"); + } + else + { + Console.WriteLine($"Failed {testName}"); + } + } + + { + string testName = "Check parallel parsing"; + + int N = 100000; + var records = new StringTestRecord[N]; + var rand = new Random(1); + + for (int i = 0; i < N; i++) + { + records[i] = new StringTestRecord() + { + Column1 = rand.Next().ToString(), + column2 = rand.Next().ToString(), + Column3 = rand.Next().ToString(), + }; + } + + byte[] serialized = SaneTsv.SerializeSimpleTsv(records); + + DateTime lastTime = DateTime.Now; + (string[] headers2, string[][] data2) = SaneTsv.ParseSimpleTsv(serialized); + TimeSpan unparallelTime = DateTime.Now - lastTime; + lastTime = DateTime.Now; + (string[] headers, string[][] data) = SaneTsv.ParseSimpleTsvParallel(serialized); + TimeSpan parallelTime = DateTime.Now - lastTime; + + Console.WriteLine($"Unparallel serialization time: {unparallelTime}"); + Console.WriteLine($"Parallel serialization time: {parallelTime}"); + + bool matching = true; + for (int j = 0; j < Math.Min(headers2.Length, headers.Length); j++) + { + if (headers[j] != headers2[j]) + { + matching = false; + break; + } + } + + for (int i = 0; i < Math.Min(data.Length, data2.Length) && matching; i++) + { + for (int j = 0; j < data[0].Length; j++) + { + if (data[i][j] != data2[i][j]) + { + matching = false; + break; + } + } + } + + if (matching) + { + Console.WriteLine($"Passed {testName}"); + } + else + { + Console.WriteLine($"Failed {testName}"); + } + } + Console.WriteLine("Done with tests"); } }