Complete parallel parsing implementation

This commit is contained in:
Nathan McRae 2024-03-08 11:43:06 -08:00
parent 78eaa5dbab
commit 0c61128e0e
2 changed files with 39 additions and 3 deletions

View File

@ -588,9 +588,40 @@ public class SaneTsv
} }
} }
T[] parsedRecords = ParseParallel<T>(inputBuffer, format, headerPropertyInfos.ToArray(), headerTypes.ToArray(), currentLineStart - 1, inputBuffer.Length); // TODO: need to figure out where the crossover is
// Complication: it probably depends on processor count
if (inputBuffer.Length < 10000)
{
parsed.Records.AddRange(ParseParallel<T>(inputBuffer, format, headerPropertyInfos.ToArray(), headerTypes.ToArray(), currentLineStart - 1, inputBuffer.Length));
return parsed;
}
else
{
int parseStart = currentLineStart;
int tasks = Environment.ProcessorCount - 1;
int splitCount = (inputBuffer.Length - parseStart) / tasks;
T[][] parsedValues = new T[tasks][];
Parallel.For(0, tasks, i =>
{
int startIndex = i * splitCount + parseStart - 1;
int endIndex;
if (i == tasks - 1)
{
endIndex = inputBuffer.Length;
}
else
{
endIndex = (i + 1) * splitCount + parseStart;
}
parsed.Records.AddRange(parsedRecords); parsedValues[i] = ParseParallel<T>(inputBuffer, format, headerPropertyInfos.ToArray(), headerTypes.ToArray(), startIndex, endIndex);
});
for (int i = 0; i < tasks; i++)
{
parsed.Records.AddRange(parsedValues[i]);
}
}
return parsed; return parsed;
} }
@ -730,6 +761,11 @@ public class SaneTsv
} }
} }
if (endIndex < inputBuffer.Length)
{
return parsed.ToArray();
}
fields.Add(fieldBytes.ToArray()); fields.Add(fieldBytes.ToArray());
if (fields.Count == 0) if (fields.Count == 0)

View File

@ -510,7 +510,7 @@ internal class Program : SaneTsv
{ {
string testName = "Check parallel parsing"; string testName = "Check parallel parsing";
int N = 1000; int N = 1000000;
var records = new BoolTestRecord[N]; var records = new BoolTestRecord[N];
var rand = new Random(1); var rand = new Random(1);