Complete parallel parsing implementation
This commit is contained in:
		
							
								
								
									
										40
									
								
								SaneTsv.cs
									
									
									
									
									
								
							
							
						
						
									
										40
									
								
								SaneTsv.cs
									
									
									
									
									
								
							@@ -588,9 +588,40 @@ public class SaneTsv
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    T[] parsedRecords = ParseParallel<T>(inputBuffer, format, headerPropertyInfos.ToArray(), headerTypes.ToArray(), currentLineStart - 1, inputBuffer.Length);
 | 
			
		||||
    // TODO: need to figure out where the crossover is
 | 
			
		||||
    // Complication: it probably depends on processor count
 | 
			
		||||
    if (inputBuffer.Length < 10000)
 | 
			
		||||
    {
 | 
			
		||||
      parsed.Records.AddRange(ParseParallel<T>(inputBuffer, format, headerPropertyInfos.ToArray(), headerTypes.ToArray(), currentLineStart - 1, inputBuffer.Length));
 | 
			
		||||
      return parsed;
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      int parseStart = currentLineStart;
 | 
			
		||||
      int tasks = Environment.ProcessorCount - 1;
 | 
			
		||||
      int splitCount = (inputBuffer.Length - parseStart) / tasks;
 | 
			
		||||
      T[][] parsedValues = new T[tasks][];
 | 
			
		||||
      Parallel.For(0, tasks, i =>
 | 
			
		||||
      {
 | 
			
		||||
        int startIndex = i * splitCount + parseStart - 1;
 | 
			
		||||
        int endIndex;
 | 
			
		||||
        if (i == tasks - 1)
 | 
			
		||||
        {
 | 
			
		||||
          endIndex = inputBuffer.Length;
 | 
			
		||||
        }
 | 
			
		||||
        else
 | 
			
		||||
        {
 | 
			
		||||
          endIndex = (i + 1) * splitCount + parseStart;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    parsed.Records.AddRange(parsedRecords);
 | 
			
		||||
        parsedValues[i] = ParseParallel<T>(inputBuffer, format, headerPropertyInfos.ToArray(), headerTypes.ToArray(), startIndex, endIndex);
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      for (int i = 0; i < tasks; i++)
 | 
			
		||||
      {
 | 
			
		||||
        parsed.Records.AddRange(parsedValues[i]);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return parsed;
 | 
			
		||||
  }
 | 
			
		||||
@@ -730,6 +761,11 @@ public class SaneTsv
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (endIndex < inputBuffer.Length)
 | 
			
		||||
    {
 | 
			
		||||
      return parsed.ToArray();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fields.Add(fieldBytes.ToArray());
 | 
			
		||||
   
 | 
			
		||||
    if (fields.Count == 0)
 | 
			
		||||
 
 | 
			
		||||
@@ -510,7 +510,7 @@ internal class Program : SaneTsv
 | 
			
		||||
    {
 | 
			
		||||
      string testName = "Check parallel parsing";
 | 
			
		||||
 | 
			
		||||
      int N = 1000;
 | 
			
		||||
      int N = 1000000;
 | 
			
		||||
      var records = new BoolTestRecord[N];
 | 
			
		||||
      var rand = new Random(1);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user