pythonpython-3.xpyqtpyqt5qdatastream

Parse stream in x length chunks based on uint32 id


I have little C/Qt experience and have a small parser that I need to port to Python. Is anyone able to explain how I can implement the below in Python? I understand what the result is, just can't understand how to achieve the uint32 instantiation and shift that results in different part lengths based on the 4 byte "id". I'm hoping to parse it using just native Python 3.5+ packages, numpy or similar is fine if it makes the typing convenient.

QDataStream stream(item);
stream.setByteOrder(QDataStream::LittleEndian);
Items parts;
while (!stream.atEnd()) {
    quint32 partId;
    stream >> id;
    char *bytes;
    uint length;
    stream.readBytes(bytes, length);
    parts.append(QPair<quint32, QByteArray>(id, QByteArray(bytes, length)));
    delete bytes;
}
return parts;

Solution

  • Since in python the numeric types do not match those of C++ then QDataStream no longer uses the ">>" operator to obtain the value but has specific methods such as readUInt32.

    Considering the following code that generates the data:

    #include <QDataStream>
    #include <QFile>
    #include <QDebug>
    
    int main()
    {
        QFile file("file.dat");
        if(!file.open(QIODevice::WriteOnly)){
            qDebug() << file.error() << file.errorString();
            return EXIT_FAILURE;
        }
        QDataStream stream(&file);
        stream.setByteOrder(QDataStream::LittleEndian);
        stream.setVersion(QDataStream::Qt_5_15);
    
        QVector <QPair<quint32, QByteArray>> data;
        data.append({1, "One"});
        data.append({2, "Two"});
        data.append({3, "Three"});
    
        for(const QPair<quint32, QByteArray> & d: qAsConst(data)){
            stream << d.first;
            stream.writeBytes(d.second.constData(), d.second.size());
        }
        return EXIT_SUCCESS;
    }
    

    The following code gets the data:

    import sys
    
    from PyQt5.QtCore import QByteArray, QDataStream, QFile, QIODevice
    
    
    file = QFile("file.dat")
    if not file.open(QIODevice.ReadOnly):
        print(file.error(), file.errorString())
        sys.exit(-1)
    
    items = []
    
    stream = QDataStream(file)
    stream.setByteOrder(QDataStream.LittleEndian)
    stream.setVersion(QDataStream.Qt_5_15)
    while not stream.atEnd():
        id_ = stream.readUInt32()
        data = stream.readBytes()
        items.append((id_, QByteArray(data)))
    print(items)
    

    Output:

    [(1, PyQt5.QtCore.QByteArray(b'One')), (2, PyQt5.QtCore.QByteArray(b'Two')), (3, PyQt5.QtCore.QByteArray(b'Three'))]
    

    If PySide2 is used then the implementation changes a bit.

    import sys
    
    from PySide2.QtCore import QByteArray, QDataStream, QFile, QIODevice
    
    
    file = QFile("file.dat")
    if not file.open(QIODevice.ReadOnly):
        print(file.error(), file.errorString())
        sys.exit(-1)
    
    items = []
    
    stream = QDataStream(file)
    stream.setByteOrder(QDataStream.LittleEndian)
    stream.setVersion(QDataStream.Qt_5_15)
    while not stream.atEnd():
        id_ = stream.readUInt32()
        data = QByteArray()
        stream >> data
        items.append((id_, data))
    print(items)
    

    Output:

    [(1, PySide2.QtCore.QByteArray(b'One')), (2, PySide2.QtCore.QByteArray(b'Two')), (3, PySide2.QtCore.QByteArray(b'Three'))]
    

    Update:

    It is not possible to obtain the data if you do not use QDataStream since Qt uses its own format for each type of data, and this format is not a standard that can change with each version without notifying it. For this reason, the byteorder and the version of QDataStream used must be indicated.

    Update 2

    Assuming that the format that QDataStream uses to pack quint32 and bytes is QDataStream.Qt_5_15 then a possible implementation is:

    import sys
    import struct
    
    items = []
    
    with open("file.dat", "rb") as f:
        while True:
            try:
                (id_,) = struct.unpack("I", f.read(4))
                (length,) = struct.unpack("I", f.read(4))
                data = f.read(length)
            except (EOFError, struct.error) as e:
                break
            else:
                items.append((id_, data))
    
    print(items)
    

    Output:

    [(1, b'One'), (2, b'Two'), (3, b'Three')]