GraphLab: Distributed Graph-Parallel API  2.1
Topic Modeling

The topic modeling toolkit contains a collection of applications targeted at clustering documents and extracting topical representations. The resulting topical representation can be used as a feature space in information retrieval tasks and to group topically related words and documents.

Currently the text modeling toolkit implements a fast asynchronous collapsed Gibbs sampler for the widely used Latent Dirichlet Allocation (LDA) model. In the near future we plan to add a Collapsed Variational Bayesian inference algorithm for the LDA model as well as some more general topic models.

The LDA Model

The LDA model associates a topic id with each token (word) in each document in the input corpus. Conceptually, topic ids correspond to semantic groups like "foods", "colors", and "politics" however the association between the id 1, 2, ..., N and the particular topic meaning "foods", "colors", ... is not know in advance and can be resolved by running the approximate inference algorithm. In addition the LDA model assigns a distribution over topics to each document and a distribution over term to each topic. The the topic id for each token is drawn from the topic distribution for each document. The actual word is then drawn from the term distribution for that topic. At a high-level the LDA model encodes the following intuitive assumptions:

  • Words in the same document are topically related.
  • Documents that share common terms are topically related.

Solving for the latent topic assignments of each token as well as the topic distribution for each document and the term distribution for each topic is a challenging (NP-Hard) task. Fortunately there are several approximate inference algorithms that typically can resolve coherent posterior estimates for the LDA model.

The Collapsed Gibbs Sampler

The topic modeling toolkit currently implements an asynchronous variant of the Collapsed Gibbs Sampler described by Griffiths and Steyvers in their landmark paper Finding Scientific Topics. The collapsed Gibbs sampler is a Markov Chain Monte Carlo (MCMC) algorithm which generates a sequence of topic assignments for each token that in the limit converge to a sequence of samples drawn from the posterior distribution. In practice the algorithm is run for a sufficient long time to allow the topics to "converge" (sometimes referred to as burn-in) and then the last few samples are used to estimate the posterior distribution over topics for each document and the posterior distribution over words for each topic.

The Parallel Collapsed Gibbs Sampler

The parallelization of the Collapsed Gibbs Sampler is achieved by drawing new assignments for multiple tokens simultaneously using a method that is similar to that described by Ahmed et al. (Paper). Unfortunately, the collapsed LDA model used to accelerate mixing of the Gibbs sampler also eliminates any conditional independence structure needed to obtain a parallel ergodic sampler as described by Gonzalez et al. (Paper)

However, by mapping the collapsed Gibbs sampler into the GraphLab abstraction we obtain a statistically more efficient algorithm. To implement the collapsed Gibbs sampler in GraphLab we construct a bipartite graph connecting each document with terms that occur in that document. Each edge contains the token count and latent topic assignments for that token. The GraphLab update function maintains the term and document counts during the gather and apply phases and then samples new values for the tokens on the scatter phase. We exploit local atomic integer operations and the GraphLab caching model to immediately propagate changes. The asynchronous consistency model ensures that only one token per document term pair is sampled at a time improving upon the original formulation of the asynchronous Gibbs sampler described by Ahmed et al. (Paper) or the sampler described by Asuncion et al. (Paper).

cgs_lda_usage Usage

The collapsed Gibbs sampler application (cgs_lda.cpp) takes as an input a text corpus represented as one or more token files. Each token consists of lines in the form:

<docid> <wordid> <count>

for example a file containing:

0    0     2
0    4     1
1    2     3

implies that the word with id 0 occurs twice in document 0, the word with id 4 occurs once in document 0, and the word with id 2 occurs three times in document 1.

On termination the system outputs for each term the number of occurrences of that term that have been assigned to each topic and for each document the number of tokens assigned to each document.


To demonstrate how the CGS LDA application works we have obtained a copy of the Daily Kos bag-of-words data from the UCI Repository and reformatted it for the cgs_lda application. You can download the reformatted data from here. Once extracted the folder contains:

> ls -lR daily_kos
total 120
-rw-r--r--  1 jegonzal  staff    904 Jul  1 22:37 README
-rw-r--r--@ 1 jegonzal  staff  55467 Jul  1 22:21 dictionary.txt
drwxr-xr-x  3 jegonzal  staff    102 Jul  1 22:21 tokens

total 7960
-rw-r--r--  1 jegonzal  staff  4074516 Jul  1 22:21 doc_word_count.tsv

To run the CGS_LDA GraphLab application on a single machine we simply run:

> ./cgs_lda --corpus ./daily_kos/tokens --dictionary ./daily_kos/dictionary.txt

This will run indefinitley display the top words in each topic every 10 seconds. To help visualize the output if you open the webpage


We render a word cloud webpage that connects directly to the cgs_lda appliction's internal web-server running on localhost port 8090.

In most cases we will also be interested in collecting the final assignments. This can be done by running:

> ./cgs_lda --corpus ./daily_kos/tokens --dictionary ./daily_kos/dictionary.txt \
   --word_dir word_counts --doc_dir doc_counts --burnin=60 

This will run the cgs_lda sampler for roughly 60 seconds and the save the counts of tokens in each topic for the words and documents in the files word_counts_x_of_x and doc_counts_x_of_x . If instead you would like to save the counts to seperate folders you can prepend the folder path.

By default GraphLab runs with two threads. However we can increase the parallelism on a single machine by increasing the number of threads:

> ./cgs_lda --corpus ./daily_kos/tokens --dictionary ./daily_kos/dictionary.txt \

The cgs_lda application can run in the distributed setting as well simply by using MPI to launch it:

> mpiexec --hostfile machine_list.txt -n 16 ./cgs_lda \
  --corpus ./daily_kos/tokens --dictionary ./daily_kos/dictionary.txt \

This will run 16 instances each consuming 8 on the machines in the machine_list.txt file. Each of these instances will automatically communicate splitting the work as well as the memory requirements. It is really that easy!

Since we are running in the distributed setting it is convenient to be able to read and write to a distributed filesystem. We have built HDFS support into the GraphLab abstraction therefore we can simply change the arguments to be:

> mpiexec --hostfile machine_list.txt -n 16 ./cgs_lda \
  --corpus hdfs:// \
  --dictionary hdfs:// \

Command Line Options

There are a wide range of options available when calling the cgs_lda program:

  • –corpus Required The path to the token file(s). This can be a folder, a file, or a folder plus a file prefix. In addition this can be a path in hdfs (e.g., hdfs://namenode/tokens/).
  • –help Display the help screen listing the available options
  • –dictionary (Optional) The path to the dictionary file. This can be a local path or a path on hdfs (e.g., hdfs://namenode/dictionary.txt). If no dictionary is provided then incremental topk words lists will not be generated and the word list webpage will not be available.
  • –engine (Optional, Default: asynchronous) The engine type to use when executing the vertex program. Accepted values are:
    • synchronous: All tokens are sampled simultaneously leading to faster computation but slower convergence.
    • asynchronous: Tokens are sampled as resources become available and token conditionals are updated immediately after sampling. This reduces parallelism and sampling speed but increases the rate of convergence.
  • –ntopics (Optional, Default: 50) The number of topics to use in the LDA model. Using fewer topics will increase speed and result in more "mixed" concepts. Using more topics will slow down sampling but lead to more specific topics.
  • –alpha (Optional, Default 1) The hyper-parameter for the topic distribution for each document. Larger values imply documents have a more uniform mix of topics. Smaller values (less than one) imply that documents are more focused on a small subsets of topics. Note that smaller values also slow down convergence of the sampler.
  • –beta (Optional, Default 0.1) The hyper-parameter for the word distribution in each topic. Larger values imply that topics contain all words equally and smaller values (less than one) imply that topics are focused on a small set of words. Note that smaller values also slow down convergence of the sampler.
  • –topk (Optional, Default 5) The number of words to show in each topic when incrementally listing the top words in each topic. This also affects the word cloud viewer.
  • –interval (Optional, Default 10) The time in seconds between when the incremental listing of top words is presented.
  • –max_count (Optional, Default 100) The maximum number of occurrences of a token in a document. If a token occurs more than max_count then it is reported as occurring max_count times. This ensures that overly frequent words do not dominate documents.
  • –loadjson (Optional, Default false) This flag is used to turn on the experimental JSON graph loader that reads graphs constructed using external graph builder libraries. If set to true then the –corpus argument must point to the JSON files.
  • –burnin (Optional, Default -1) The time in seconds to run the sampler before the sample is saved to file (and the sampler terminates). If the value is less than zero then the sample will run indefinitely.
  • –doc_dir (Optional, Default empty) The location (path/prefix) to save the final topic counts for each document after burnin. This can also be an hdfs path (e.g., hdfs://namenode/folder/prefix). If this is not set then the per document topic counts are not saved.
  • –word_dir (Optional, Default empty) The location (path/prefix) to save the final topic counts for each word after burnin. This can also be an hdfs path (e.g., hdfs://namenode/folder/prefix). If this is not set then the per word topic counts are not saved.
  • –ncpus (Optional, Default 2) The number of local computation threads to use on each machine. This should typically match the number of physical cores.
  • –scheduler (Optional, Default sweep) The scheduler to use when running with the asynchronous engine. The default is typically sufficient.
  • –engine_opts (Optional, Default empty) Any additional engine options. See –engine_help for a list of options.
  • –graph_opts (Optional, Default empty) Any additional graph options. See –graph_help for a list of options.
  • –scheduler_opts (Optional, Default empty) Any additional scheduler options. See –scheduler_help for a list of options.