Сбой чтения из HttpResponseStream

Я столкнулся с проблемой, когда чтение из HttpResponseStream не удается, потому что StreamReader, который я оборачиваю, читает быстрее, чем поток ответов получает фактический ответ. Я получаю файл достаточно небольшого размера (около 60 КБ), но синтаксический анализатор, который обрабатывает ответ в фактический объект, дает сбой, потому что он сталкивается с неожиданным символом (код 65535), который, по опыту, я знаю, что это символ, созданный при чтении из StreamReader и больше нет доступных символов.

Для записи я знаю, что возвращаемый контент действителен и будет правильно анализироваться, поскольку сбой происходит в разных точках файла каждый раз, когда я запускаю код. Это строка parser.Load() в следующем, где она терпит неудачу.

Есть ли способ убедиться, что я прочитал весь контент, прежде чем пытаться его проанализировать, за исключением копирования потока ответов в MemoryStream или строку и последующей его обработки?

    /// <summary>
    /// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries
    /// </summary>
    /// <param name="sparqlQuery">SPARQL Query String</param>
    /// <returns>RDF Graph</returns>
    public Graph QueryWithResultGraph(String sparqlQuery)
    {
        try
        {
            //Build the Query URI
            StringBuilder queryUri = new StringBuilder();
            queryUri.Append(this._endpoint.ToString());
            queryUri.Append("?query=");
            queryUri.Append(Uri.EscapeDataString(sparqlQuery));

            if (!this._defaultGraphUri.Equals(String.Empty))
            {
                queryUri.Append("&default-graph-uri=");
                queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri));
            }

            //Make the Query via HTTP
            HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false);

            //Set up an Empty Graph ready
            Graph g = new Graph();
            g.BaseURI = this._endpoint;

            //Parse into a Graph based on Content Type
            String ctype = httpResponse.ContentType;
            IRDFReader parser = MIMETypesHelper.GetParser(ctype);
            parser.Load(g, new StreamReader(httpResponse.GetResponseStream()));

            return g;
        }
        catch (UriFormatException uriEx)
        {
            //URI Format Invalid
            throw new Exception("The format of the URI was invalid", uriEx);
        }
        catch (WebException webEx)
        {
            //Some sort of HTTP Error occurred
            throw new Exception("A HTTP Error occurred", webEx);
        }
        catch (RDFException)
        {
            //Some problem with the RDF or Parsing thereof
            throw;
        }
        catch (Exception)
        {
            //Other Exception
            throw;
        }
    }

    /// <summary>
    /// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint
    /// </summary>
    /// <param name="target">URI to make Request to</param>
    /// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param>
    /// <returns>HTTP Response</returns>
    private HttpWebResponse DoQuery(Uri target, bool sparqlOnly)
    {
        //Expect errors in this function to be handled by the calling function

        //Set-up the Request
        HttpWebRequest httpRequest;
        HttpWebResponse httpResponse;
        httpRequest = (HttpWebRequest)WebRequest.Create(target);

        //Use HTTP GET/POST according to user set preference
        if (!sparqlOnly)
        {
            httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader();
            //For the time being drop the application/json as this doesn't play nice with Virtuoso
            httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty);
        }
        else
        {
            httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader();
        }
        httpRequest.Method = this._httpMode;
        httpRequest.Timeout = this._timeout;

        //HTTP Debugging
        if (Options.HTTPDebugging)
        {
            Tools.HTTPDebugRequest(httpRequest);
        }

        httpResponse = (HttpWebResponse)httpRequest.GetResponse();

        //HTTP Debugging
        if (Options.HTTPDebugging)
        {
            Tools.HTTPDebugResponse(httpResponse);
        }

        return httpResponse;
    }

Изменить

Чтобы уточнить то, что я уже сказал, это не ошибка в парсере, это проблема чтения StreamReader быстрее, чем поток ответов предоставляет данные. Я могу обойти это, выполнив следующие действия, но хотел бы предложить лучшие или более элегантные решения:

            //Parse into a Graph based on Content Type
            String ctype = httpResponse.ContentType;
            IRDFReader parser = MIMETypesHelper.GetParser(ctype);
            Stream response = httpResponse.GetResponseStream();
            MemoryStream temp = new MemoryStream();
            Tools.StreamCopy(response, temp);
            response.Close();
            temp.Seek(0, SeekOrigin.Begin);
            parser.Load(g, new StreamReader(temp));

Изменить 2

Класс BlockingStreamReader согласно предложению Eamon:

/// <summary>
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams)
/// </summary>
public sealed class BlockingStreamReader : StreamReader
{
    private bool _peeked = false;
    private int _peekChar = -1;

    public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { }

    public BlockingStreamReader(Stream stream) : base(stream) { }

    public override int Read()
    {
        if (this._peeked)
        {
            this._peeked = false;
            return this._peekChar;
        }
        else
        {
            if (this.EndOfStream) return -1;

            char[] cs = new char[1];
            base.ReadBlock(cs, 0, 1);

            return cs[0];
        }
    }

    public override int Peek()
    {
        if (this._peeked)
        {
            return this._peekChar;
        }
        else
        {
            if (this.EndOfStream) return -1;

            this._peeked = true;

            char[] cs = new char[1];
            base.ReadBlock(cs, 0, 1);

            this._peekChar = cs[0];
            return this._peekChar;
        }
    }

    public new bool EndOfStream
    {
        get
        {
            return (base.EndOfStream && !this._peeked);
        }
    }
}

Изменить 3

Вот значительно улучшенное решение, которое может обернуть любое TextReader и предоставить свойство EndOfStream. Он использует внутренний буфер, который заполняется с помощью ReadBlock() в обернутом TextReader. Все методы чтения () чтения могут быть определены с использованием этого буфера, размер буфера настраивается:

    /// <summary>
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency.
/// </summary>
/// <remarks>
/// <para>
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data.  All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see>
/// </para>
/// </remarks>
public sealed class BlockingTextReader : TextReader
{
    private char[] _buffer;
    private int _pos = -1;
    private int _bufferAmount = -1;
    private bool _finished = false;
    private TextReader _reader;

    public const int DefaultBufferSize = 1024;

    public BlockingTextReader(TextReader reader, int bufferSize)
    {
        if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader");
        if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize");
        this._reader = reader;
        this._buffer = new char[bufferSize];
    }

    public BlockingTextReader(TextReader reader)
        : this(reader, DefaultBufferSize) { }

    public BlockingTextReader(Stream input, int bufferSize)
        : this(new StreamReader(input), bufferSize) { }

    public BlockingTextReader(Stream input)
        : this(new StreamReader(input)) { }

    private void FillBuffer()
    {
        this._pos = -1;
        if (this._finished)
        {
            this._bufferAmount = 0;
        }
        else
        {
            this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length);
            if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true;
        }
    }

    public override int ReadBlock(char[] buffer, int index, int count)
    {
        if (count == 0) return 0;
        if (buffer == null) throw new ArgumentNullException("buffer");
        if (index < 0) throw new ArgumentException("index", "Index must be >= 0");
        if (count < 0) throw new ArgumentException("count", "Count must be >= 0");
        if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small");

        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return 0;
            }
            else
            {
                return 0;
            }
        }

        this._pos = Math.Max(0, this._pos);
        if (count <= this._bufferAmount - this._pos)
        {
            //If we have sufficient things buffered to fufill the request just copy the relevant stuff across
            Array.Copy(this._buffer, this._pos, buffer, index, count);
            this._pos += count;
            return count;
        }
        else
        {
            int copied = 0;
            while (copied < count)
            {
                int available = this._bufferAmount - this._pos;
                if (count < copied + available)
                {
                    //We can finish fufilling this request this round
                    int toCopy = Math.Min(available, count - copied);
                    Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy);
                    copied += toCopy;
                    this._pos += toCopy;
                    return copied;
                }
                else
                {
                    //Copy everything we currently have available
                    Array.Copy(this._buffer, this._pos, buffer, index + copied, available);
                    copied += available;
                    this._pos = this._bufferAmount;

                    if (!this._finished)
                    {
                        //If we haven't reached the end of the input refill our buffer and continue
                        this.FillBuffer();
                        if (this.EndOfStream) return copied;
                        this._pos = 0;
                    }
                    else
                    {
                        //Otherwise we have reached the end of the input so just return what we've managed to copy
                        return copied;
                    }
                }
            }
            return copied;
        }
    }

    public override int Read(char[] buffer, int index, int count)
    {
        return this.ReadBlock(buffer, index, count);
    }

    public override int Read()
    {
        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return -1;
            }
            else
            {
                return -1;
            }
        }

        this._pos++;
        return (int)this._buffer[this._pos];
    }

    public override int Peek()
    {
        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return -1;
            }
            else
            {
                return -1;
            }
        }

        return (int)this._buffer[this._pos + 1];
    }

    public bool EndOfStream
    {
        get
        {
            return this._finished && (this._pos >= this._bufferAmount - 1);
        }
    }

    public override void Close()
    {
        this._reader.Close();
    }

    protected override void Dispose(bool disposing)
    {
        this.Close();
        this._reader.Dispose();
        base.Dispose(disposing);
    }
}

person RobV    schedule 12.08.2009    source источник
comment
Итак, спустя девять лет после того, как он был представлен, вы оказались первым человеком в мире, который обнаружил, что StreamReader читается быстрее, чем Stream, для которого он предназначен, верно?   -  person John Saunders    schedule 12.08.2009
comment
Нет, мне просто интересно, есть ли у кого-нибудь решения более элегантные, чем приведенные выше.   -  person RobV    schedule 12.08.2009
comment
Решения чего? StreamReader читается не быстрее, чем Stream.   -  person John Saunders    schedule 12.08.2009
comment
Ошибка в вашем коде. Помните: это всегда ваша вина. Это никогда не происходит по вине ОС или Framework.   -  person John Saunders    schedule 12.08.2009
comment
вы действительно не имеете смысла, ваш первый комментарий предполагает, что StreamReader работает слишком быстро, а затем вы говорите, что это не так, вы на самом деле совсем не помогаете мне решить эту проблему   -  person RobV    schedule 12.08.2009
comment
О, в фреймворке много ошибок, и я столкнулся с несколькими, но очевидно, что чаще всего ошибка возникает в вашем коде, в данном случае в синтаксическом анализаторе.   -  person Eamon Nerbonne    schedule 13.08.2009
comment
Да, но дело в том, что ошибка была не в моем коде, а во фреймворке. Дело в том, что вызов Read() не блокировался при использовании сетевых потоков, так как парсер считывал данные из потока быстрее, чем сеть могла их предоставить.   -  person RobV    schedule 28.09.2010


Ответы (2)


Не зная специфики анализатора, который вы используете, я могу только догадываться об ошибке, но есть довольно простая ошибка, которую библиотеки ввода-вывода .NET Framework почти поощряют вас делать...

Знаете ли вы, что Streams и TextReaders могут читать меньше байтов/символов, чем запрошено?

В частности, документы TextReader.Read(char[] buffer, int index, int count) говорят:

Возвращаемое значение

Тип: Система..::.Int32

Количество прочитанных символов. Число будет меньше или равно count, в зависимости от того, доступны ли данные в потоке. Этот метод возвращает ноль, если вызывается, когда больше не осталось символов для чтения.

Акцент мой.

Например, если вы вызываете reader.Read(buffer, 0, 100), вы не можете предположить, что было прочитано 100 символов.

Правка: вполне вероятно, что синтаксический анализатор действительно предполагает это; и это объясняет наблюдаемое вами поведение: если вы полностью кэшируете поток в MemoryStream, всегда будет достаточно символов для выполнения запроса, но если вы этого не сделаете, синтаксический анализатор получит меньше символов, чем запрошено в непредсказуемое время, когда основной поток «медленный».

Edit2: вы можете исправить ошибку, заменив все экземпляры TextReader.Read() в синтаксическом анализаторе на TextReader.ReadBlock().

person Eamon Nerbonne    schedule 12.08.2009
comment
Я знал об этом, я не уверен, что это необходимо считать ошибкой в ​​StreamReader, скорее, это то, как он ведет себя, когда основной поток может быть медленным. Парсер не является проблемой, если я использую второй фрагмент кода (добавленный к исходному вопросу), который читает весь поток перед его разбором, отлично анализирует - person RobV; 12.08.2009
comment
Это является ошибкой парсера с очень высокой вероятностью. Так задумано, если базовый поток медленный, streamreader возвращает меньше символов, чем запрошено. Использование потока памяти в качестве базового потока приводит к тому, что streamreader всегда возвращает полное количество символов, что позволяет обойти ошибку в синтаксическом анализаторе. - person Eamon Nerbonne; 13.08.2009
comment
Парсер использует базовый токенизатор, который считывает символ за символом с помощью метода Read(), поэтому вы, скорее всего, правы, я проверю вещь ReadBlock() и приму ваш ответ, если это поможет решить проблему. - person RobV; 13.08.2009
comment
Метод ReadBlock() не решает мою проблему полностью, поскольку даже если я его использую, мне все равно нужно делать много вызовов Peek(), что приводит к той же проблеме, что и Read(). - person RobV; 14.08.2009
comment
Тогда у вас есть три варианта: (1) просто предварительно кэшировать весь поток в потоке памяти; (2) реализовать свой собственный подкласс TextReader, который обертывает другой TextReader и блокирует Peek() и Read() (на самом деле это довольно просто; вам нужно только реализовать Peek+Read с точки зрения ReadBlock), или (3) заменить вызовы Peek с локальным односимвольным упреждающим символом, заполненным с помощью ReadBlock (который вам нужно будет убедиться, что он будет включен вручную при следующем чтении. Я бы предпочел вариант (2). - person Eamon Nerbonne; 14.08.2009
comment
(о, и если вы подкласс - не забывайте, что TextReader IDisposable) - person Eamon Nerbonne; 14.08.2009
comment
вариант (2) довольно элегантен и именно то, на что я надеялся, есть небольшая сложность с тем фактом, что вы не можете переопределить EndOfStream и вместо этого должны его затенять, что означает, что вам нужно, чтобы StreamReader блокировал вас чтобы убедиться, что вы набрали его как подкласс, а не как StreamReader, иначе вы преждевременно достигнете конца потока. Я разместил свой код как редактирование вопроса - person RobV; 17.08.2009

Для поддержки сценария блокирующего чтения вместо создания подкласса StreamReader вы можете создать подкласс TextReader: это позволит избежать проблем с EndOfStream и означает, что вы можете блокировать любое чтение, а не только StreamReader:

public sealed class BlockingReader : TextReader
{
    bool hasPeeked;
    int peekChar;
    readonly TextReader reader;

    public BlockingReader(TextReader reader) { this.reader = reader; }

    public override int Read()
    {
        if (!hasPeeked)
            return reader.Read();
        hasPeeked = false;
        return peekChar;
    }

    public override int Peek()
    {
        if (!hasPeeked)
        {
            peekChar = reader.Read();
            hasPeeked = true;
        }
        return peekChar;
    }

    public override int Read(char[] buffer, int index, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (index < 0)
            throw new ArgumentOutOfRangeException("index");
        if (count < 0)
            throw new ArgumentOutOfRangeException("count");
        if ((buffer.Length - index) < count)
            throw new ArgumentException("Buffer too small");

        int peekCharsRead = 0;
        if (hasPeeked)
        {
            buffer[index] = (char)peekChar;
            hasPeeked = false;
            index++;
            count--;
            peekCharsRead++;
        }

        return peekCharsRead + reader.ReadBlock(buffer, index, count);
    }

    protected override void Dispose(bool disposing)
    {
        try
        {
            if (disposing)
                reader.Dispose();
        }
        finally
        {
            base.Dispose(disposing);
        }
    }
}
person Eamon Nerbonne    schedule 17.08.2009