HorrorInferno
@HorrorInferno
веб-разработчик, бэкэндер

Как разбить парсинг XML файла на несколько потоков в C#?

Добрый день.

Есть задача распарсить огромный XML файл (1ТБ) и занести данные в БД.
В одном единственном потоке это всё работает очень медленно, и парсер закончит свою работу приблизительно через три года :D

В общем нужно как-то грамотно разбить парсинг на несколько потоков. Есть варианты?

P.S. с многопоточностью в C# я еще ни разу не работал.

Вот код, который у меня работает в данный момент:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Xml;
using System.Xml.Linq;
using Npgsql;

namespace MapReader
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.Write("> ");

            string path = Console.ReadLine();

            IEnumerable<XElement> root = from el in Root(path) select el;

            Osm2Pg pgosm = new Osm2Pg();

            pgosm.CreateTables();

            foreach (XElement item in root)
            {

                if (item.Name == "way")
                {

                    long wayID = long.Parse(item.Attribute("id").Value);

                    Console.WriteLine("way: " + item.Attribute("id").Value);

                    foreach (XElement nd in item.Elements("nd"))
                    {

                        long nodeReference = long.Parse(nd.Attribute("ref").Value);
                        pgosm.InsertWayNds(wayID, nodeReference);

                        Console.WriteLine("--nd: " + nd.Attribute("ref").Value);
                    }

                    foreach (XElement tag in item.Elements("tag"))
                    {

                        string key = tag.Attribute("k").Value;
                        string value = tag.Attribute("v").Value;
                        pgosm.InsertWayTags(wayID, key, value);

                        Console.WriteLine("--tag: " + tag.Attribute("k").Value);
                    }

                }


                // проходимя по node.
                if (item.Name == "node")
                {
                    // конвертируем координаты из географической системы в декартову.
                    double lon = double.Parse(item.Attribute("lon").Value);
                    double lat = double.Parse(item.Attribute("lat").Value);

                    float x = (float)GeoHelper.lonToX(lon);
                    float z = (float)GeoHelper.latToY(lat);


                    long nodeId = long.Parse(item.Attribute("id").Value);
                    pgosm.InsertNodes(nodeId, x, z);
                    
                    Console.WriteLine("node: " + x + "," + z);

                    if (item.HasAttributes)
                    {
                        foreach (XElement tag in item.Elements("tag"))
                        {

                            string key = tag.Attribute("k").Value;
                            string value = tag.Attribute("v").Value;
                            pgosm.InsertNodeTags(nodeId, key, value);

                            Console.WriteLine("--tag: " + tag.Attribute("k").Value);
                        }
                    }
                }
            }

            Console.WriteLine("End of program...");
            Console.Read();

        }

        // магия б***ь...
        static IEnumerable<XElement> Root(string path)
        {
            using (XmlReader reader = XmlReader.Create(path))
            {
                while (reader.Read())
                {
                    if (reader.Name == "way" || reader.Name == "node")
                    {
                        XElement el = XElement.ReadFrom(reader) as XElement;
                        if (el != null)
                            yield return el;
                    }
                }
            }

        }

    }
}
  • Вопрос задан
  • 655 просмотров
Пригласить эксперта
Ответы на вопрос 2
2ord
@2ord
В добавок к сказанному у cicatrix
заменить циклы foreach на
Parallel.Invoke(() => DoSomeWork(), () => DoSomeOtherWork());

При помощи библиотеки Task-based Asynchronous Programming
Это избавит от необходимости в
Количество потоков подобрать под конкретное железо, либо (если есть желание заморочиться) можно сделать настраиваемым - начать с 4 потоков, считать среднюю скорость обработки узлов (кол-во узлов в минуту), вводить по одному потоку в минуту, измеряя, увеличилось ли время или уменьшилось. Если время увеличилось - возвращаем прежнее количество, если уменьшилось - добавляем ещё поток, пока не найдём оптимальный вариант.
Ответ написан
Комментировать
@cicatrix
было бы большой ошибкой думать
Я смотрю, у вас какой-то XML ридер особый. Многое зависит от его реализации, а именно - его потокобезопасности.
В принципе, у вас главный foreach можно распараллелить следующим образом:
Делаем по шаблону "производитель-потребитель"

Производитель у вас будет один - ваш ридер, который должен "поставлять" ссылки на отдельные узлы вашего файла. Потребителем должны стать потоки, хватающие первый попавшийся (и учтите это сразу - они будут хватать не по порядку, а именно - первый попавшийся узел) и парсящие их.
Количество потоков подобрать под конкретное железо, либо (если есть желание заморочиться) можно сделать настраиваемым - начать с 4 потоков, считать среднюю скорость обработки узлов (кол-во узлов в минуту), вводить по одному потоку в минуту, измеряя, увеличилось ли время или уменьшилось. Если время увеличилось - возвращаем прежнее количество, если уменьшилось - добавляем ещё поток, пока не найдём оптимальный вариант.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы