Distribution of population age with QtConcurrent :: mappedReduced()

This article demonstrates how to process large data sets with QtConcurrent::mappedReduced() on an example of a simple census.

Introduction - MapReduce
MapReduce is a high level programming model for processing large data sets in parallel, originally developed by Google, adapted from functional programming. The model is suitable for a range of problems such as matrix operations, relational algebra, statistical frequency counting etc. To learn more about MapReduce, you can read Chapter 2 from the book Mining of Massive Datasets by Lescovec, Rajaraman & Ullman.

In general, a map function takes an input and produces an intermediate key-value pair or a list of intermediate key-value pairs. Reduce function performs a summary operation on the intermediate keys. You can see this below:

The diagram has been inspired by a course presentation Massive Parallelism with MapReduce.

This example shows a census of a small state with 5 cities. We are going to process multiple files, each corresponding to a single city. The snippet below shows how such a file looks like (this is a part of a file 'city1.txt'). Each line contains an age of a single person.

57
52
31
26
2
7
40
44
12
13
...

In general, there is not a single correct way to implement the desired functionality within the MapReduce model. Some solutions might lead to better performance than others. This article presents two variants of the solution. In this particular case, both variants will result in the same number of calls to map and reduce function. The map function is going to be called 5 times (for every 'city*.txt' file). Similarly, the reduce function is going to be called 5 times. The difference lies in the specifics of the map and reduce function.

Variant 1

Variant 1 follows the standard definition of MapReduce. A single map function reads a single city file and converts it to a QList<int>. The map function only converts a single city file into a different format. There is no grouping of values within a single city. It is the reduce function that performs the grouping of ages into one large state distribution.

Variant 2

In Variant 2 a map function also reads a single city file. However, unlike in Variant 1, the map function also performs a local reduction , i.e. it groups the ages of citizens of a single city into age intervals (this is similar to Qt's official Word Count example). The reduce function then groups the already existing city distributions into one large state distribution.

General implementation for Variant 1 and Variant 2

The implementation presented in this section is identical for both variants of the problem. Look at main.cpp below. We firstly find all the files in the current folder that start with "city" (in our example this only works for the current folder. If you need an implementation that searches through subdirectories, check the recursive implementation in the Qt's Word Count example).

QStringList findFiles(const QString &dirPath, const QString &fileStart)
{
    QStringList filePaths;
    QDir dir(dirPath);

    foreach (QString file, dir.entryList(QDir::Files)){
        if(file.startsWith(fileStart))
            filePaths += dirPath + "/" + file;
    }

    return filePaths;
}
int main(int argc, char *argv[])
{
    QCoreApplication app(argc, argv);
    QStringList filePaths = findFiles(QDir::currentPath(), "city");

    QFuture< QVector f1;
    f1 = QtConcurrent::mappedReduced(filePaths, mapFile, reduce);
    f1.waitForFinished();
    QVector<int> finalDistrib = f1.result();
    qDebug() << finalDistrib;
    drawCharacterHistogram(finalDistrib);

    return 0;
}

After that, we create an instance of QFuture to represent the result of our asynchronous computation and call Qt::mappedReduced(). Qt’s mappedReduced() requires three parameters - a sequence to which to apply the map function, a map function and a reduce function. The fourth parameter is optional and specifies the order in which results from the map function are passed to the reduce function (not relevant for our implementation). You can see the signature below:

QFuture<T> QtConcurrent::mappedReduced ( const Sequence & sequence, MapFunction mapFunction, ReduceFunction reduceFunction, QtConcurrent::ReduceOptions reduceOptions = UnorderedReduce | SequentialReduce)

The map function is going to be called automatically for every file in the 'filePaths' sequence, i.e, all the 'city*.txt' files that have been found in the current folder. The return value of each mapFunction will then automatically be passed to the reduceFunction.

After we start the computation, we wait for it to finish with f1.waitForFinished(), print the result of the computation and draw a simple histogram made out of @ characters. The histogram is drawn in a horizontal fashion by the following function:

void drawCharacterHistogram(QVector= 0; i--){
        for(int j = 0; j < resultVector.at(i); j++)
            std::cout << "@";
        std::cout << "\n";
    }
}
We were able to plot the histogram directly based on the counts. With large data sets, you might want to change the function to compute percentages or scale the data before plotting.

Map function is called concurrently by Qt and for that reason, it has to be thread safe (this is not a problem here, since there is no variable allocated on the heap that the map function would try to modify). On the other hand, only one thread will call the reduce function at the same time. As a result, we do not have to use synchronisation tools (such as a mutex) in the reduce function.

Variant 1 - standard implementation

Variant 1 represents the standard implementation of MapReduce. The map function reads a single city file, checks the validity of the age values and converts it to a QList<int>. This really just results in a different representation of the file, with no grouping into age intervals.

QList<int> mapFile(const QString& fileName) {
    QFile file(fileName);
    file.open(QIODevice::ReadOnly);
    QTextStream in(&file);

    QList<int> ageList;
    while (!in.atEnd()) {
        QString line = in.readLine();
        line = line.trimmed();
        if(!line.isEmpty()){
            bool iOk = false;
            int i = line.toInt(&iOk);
            if(iOk)
                ageList.append(i);
        }
    }
    return ageList;
}

In contrast, the reduce function takes the result of the map function (QList<int>) and groups it into age intervals. The resulting (state-level) distribution is stored in QVector<int> product. The size of the resulting vector (product), i.e. the number of intervals in the histogram, is derived from the yearInterval (number of years that one interval represents, in our case 5 years) and by the age of the oldest person in the state. The product vector is resized whenever it comes across an age that does not fit into any of the current bins.

void reduce(QVector<int> &product,
                              const QList<int> &ageList) {
    for(int i = 0; i < ageList.size(); i++){
        int binNr = ageList.at(i) / yearInterval;
        if(product.size() < binNr+1)
            product.resize(binNr+1);
        product[binNr]++;
    }
}

The bin corresponding to a certain age is derived from the result of the division 'age / yearInterval'. This is shown below:

yearInterval = 5
age = 4, binNr = 4 / 5 = 0
age = 18, binNr = 18 / 5 = 3
age = 47, binNr = 47 / 5 = 9
Age value 4 will belong to the bin nr 0, age value 47 will belong to the bin nr 9 etc.

Variant 2 - local reduction in the map function

In Variant 2, the map function, aside from reading a single 'city' file, also performs a local reduction of age values and returns the local age distribution (i.e. age distribution of one city).

QVector<int> mapFile(const QString& fileName) {
    QFile file(fileName);
    file.open(QIODevice::ReadOnly);
    QTextStream in(&file);

    QVector<int> ageCounts;
    while (!in.atEnd()) {
        QString line = in.readLine();
        line = line.trimmed();
        if(!line.isEmpty()){
            bool iOk = false;
            int i = line.toInt(&iOk);
            if(iOk){
                int binNr = i / yearInterval;
                if(ageCounts.size() < binNr+1)
                    ageCounts.resize(binNr+1);
                ageCounts[binNr]++;
            }
        }
    }
    return ageCounts;
}

The reduce function then groups the existing city distributions into one large state distribution.

void reduce(QVector<int> &product,
                              const QVector<int> &ageCounts) {
    for(int i = 0; i < ageCounts.size(); i++){
        if(product.size() < ageCounts.size())
            product.resize(ageCounts.size());
        product[i]+= ageCounts.at(i);
    }
}

That's it! You can try out the computation. You should get the result below.

QVector(38, 36, 32, 29, 30, 30, 28, 24, 21, 16, 11, 8, 7, 6, 5, 4, 2, 1, 1)
@
@
@@
@@@@
@@@@@
@@@@@@
@@@@@@@
@@@@@@@@
@@@@@@@@@@@
@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

The histogram should be read horizontally with the highest age interval at the top and the lowest age interval at the bottom.

Latest update: 06.07.2016
Created: 2015
© Walletfox.com, 2017