- using System;
- using System.Collections.Concurrent;
- using System.IO;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- namespace FileCombiner
- {
- class Program
- {
- private static string _dirPath = @"C:\GIT\FileCombiner\FileCombiner\Files";
- private static string _outFile = Path.Combine(_dirPath, @"Output\output.txt");
- static void Main(string[] args)
- {
- File.Delete(_outFile);
- var output = new OutputWriter(_outFile);
- var eventLoop = new EventLoop();
- eventLoop.Start();
- eventLoop.Enqueue( () => Directory
- .GetFiles( _dirPath )
- .AsParallel()
- .ForAll( file =>
- {
- File.ReadAllLines( file )
- .AsParallel()
- .ForAll(
- line => eventLoop.Enqueue(() => output.WriteOutput( String.Format("{0}\t{1}",file,line) ) ) );
- ;
- } ));
- // not worried about completion notice at the moment - just waiting for sample file to hit 4290k.
- Console.WriteLine("Processing....press Enter to exit when complete");
- Console.ReadLine();
- }
- }
- public class OutputWriter
- {
- private readonly Object _lockSync = new Object();
- private readonly string _path;
- public OutputWriter(string path)
- {
- _path = path;
- }
- public void WriteOutput( string line )
- {
- lock(_lockSync) // Yep, locks every time - enough for example
- {
- using(var fs = File.AppendText(_path))
- {
- fs.WriteLine(line);
- }
- }
- }
- }
- public class EventLoop
- {
- public bool Running { get; set; }
- public ConcurrentQueue<Action> ActionQueue { get; set; }
- public ManualResetEventSlim Wait { get; set; }
- public void Loop()
- {
- while( Running )
- {
- Action action = null;
- if( ActionQueue.TryDequeue( out action ) )
- {
- try
- {
- Task.Factory.StartNew( action );
- }
- catch( Exception ex )
- {
- Console.WriteLine( ex );
- }
- }
- else
- {
- Wait.Reset();
- Wait.Wait();
- }
- }
- }
- public void Enqueue( Action action )
- {
- ActionQueue.Enqueue( action );
- Wait.Set();
- }
- public void Start()
- {
- Running = true;
- Task.Factory.StartNew(Loop);
- }
- public void Stop()
- {
- Running = false;
- Wait.Set();
- }
- public EventLoop( )
- {
- ActionQueue = new ConcurrentQueue<Action>();
- Wait = new ManualResetEventSlim( false );
- }
- }
- }