# Review Selection, a small dataframe benchmark

Problem: Populate part of a "reviews" section for a restaurant listing page.  We tackle two tasks:

a. Select a few diverse reviews to show on the first page.
b. Calculate this restaurant's rank vs all other restaurants for
   each rating aspect: "atmosphere", "food", "speed", "location", and "friendliness."

The number of reviews per restaurant follows a Zipfian distribution, with the top restaurants having ~1k reviews, the average head restaurant having ~50 reviews, and a very long tail with only 1 or 2 reviews.

When serving the first set of reviews for a particular restaurant, you want to serve up informative reviews from users who had different opinions about how good the restaurant was, and why they thought so.  We will treat this problem by choosing a few reviews for each star rating.

## Prelude / loading libraries

In [None]:
!pip install pandas numpy
!pip install polars
!pip install pyarrow
!pip install ray[default]
!pip install 'fsspec>=0.3.3'

In [None]:
import pandas as pd
import polars as pl
import numpy as np
from matplotlib import pyplot as plt
import platform
import pyarrow
from functools import partial
import psutil
import os

#os.environ["RUST_BACKTRACE"] = "1"
#os.environ["RUST_BACKTRACE"] = "full"

def diagnose_me():
  print(f"Python {platform.python_version()}")
  for (name, pkg) in [("Pandas", pd), ("NumPy", np), ("Polars", pl), ("PyArrow", pyarrow)]:
    print(f"{name}: {pkg.__version__}")


diagnose_me()

## Synthetic Reviews generation Code

In [None]:
from numpy.random import default_rng
rng = default_rng()

In [None]:
def multinomial_selection(choices, ps, num_samples):
  """Choose num_samples from among the given choices according to the given probabilities."""
  return np.array(choices)[np.argmax(rng.multinomial(1, ps, size=num_samples), axis=1)]

In [None]:
multinomial_selection(["beer", "brats", "coffee"], [.8, .19, .01], 20)

In [None]:
def get_stars(restaurant_quality=0., num_reviews=30):
  """Return a distribution of star ratings (1-5) for a given restaurant.

  """
  restaurant_distributions = [
    [.2, .34, .35, .1, .01],   # poor restaurant                          
    [.1, .15, .1, .6, .05],    # typical restaurant
    [.01, .0, .01, .08, .90],  # great restaurant
  ]
  dist_idx = np.clip(restaurant_quality * 3., 0, 2).astype(int)
  return np.argmax(rng.multinomial(1, restaurant_distributions[dist_idx], size=num_reviews), axis=1) + 1.

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(12,4))
_ = axs[0].hist(get_stars(restaurant_quality = .1), range=(1., 5.))
_ = axs[1].hist(get_stars(restaurant_quality = .6), range=(1., 5.))
_ = axs[2].hist(get_stars(restaurant_quality = .9), range=(1., 5.))

In [None]:
def get_review_quality(word_counts):
  """Return a distribution of textual quality (0. - 1.) for reviews with
  the given word counts.

  """
  quality_distributions = [
    [1., 0., 0., 0., 0.],      # short reviews
    [.1, .15, .1, .6, .05],    # medium reviews
    [.01, .0, .01, .08, .90],  # long reviews
  ]
  dist_indices = np.searchsorted([2, 120], word_counts)
  return np.clip( np.array(
    [np.argmax(rng.multinomial(1, quality_distributions[dist_idx])) / 4. for dist_idx in dist_indices]
  ) + (rng.random(len(word_counts)) * .25), 0., 1.)

In [None]:
def get_age_weeks(num_reviews=20):
  max_age_weeks = 1017
  jitter_weeks = 26
  return np.clip(np.random.zipf(1.6, num_reviews), 1, max_age_weeks - jitter_weeks) + np.random.randint(0, jitter_weeks, num_reviews)

In [None]:
_ = plt.hist(get_age_weeks(100), range=(0,120))

In [None]:
def random_choice(choices, num_reviews):
  return np.choose(np.random.randint(0, len(choices), num_reviews),
                   np.array(choices))

In [None]:
review_aspects = ["atmosphere", "food", "speed", "location", "friendliness"]
def get_topic(num_reviews):
  return random_choice(review_aspects,
                       num_reviews)

In [None]:
def generate_synthetic_review(restaurant_quality, num_reviews):
  # stars               - 1-5
  # reviewer_id         - int
  # review_age_weeks    - int >= 0
  # review_month        - int (1-12)
  # primary_topic       - string (categorical: atmosphere, food, speed, location, friendliness)
  # kids_involved       - bool
  # for_business        - bool
  # text_length         - int (word count)
  # text_quality        - float (0.0 - 1.0)
  # language            - string (2-letter language code)
  stars            = get_stars(restaurant_quality, num_reviews)
  reviewer_id      = np.random.randint(1000, 9999, num_reviews)
  review_age_weeks = get_age_weeks(num_reviews)
  review_month     = np.ones(num_reviews) * 12 - np.mod(review_age_weeks, 12)
  primary_topic    = get_topic(num_reviews)
  kids_involved    = np.random.binomial(1, .2, num_reviews).astype(bool)
  for_business     = np.random.binomial(1, .4, num_reviews).astype(bool) & ~kids_involved
  kids_involved    = [bool(x) for x in kids_involved]
  for_business     = [bool(x) for x in for_business]
  text_length      = np.random.randint(0, 300, num_reviews)
  text_quality     = get_review_quality(text_length)
  language         = multinomial_selection(["en", "es", "fr"],
                                           [.4, .1, .05],
                                           num_reviews)
  rv = {}
  for v in ["stars", "reviewer_id", "review_age_weeks", "review_month",
            "primary_topic", "kids_involved", "for_business", "text_length",
            "text_quality", "language"]:
            rv[v] = locals()[v]
  return rv

In [None]:
def generate_synthetic_reviews_dataset(num_listings=5000):
  num_reviews = rng.zipf(2, num_listings)
  restaurant_goodness = rng.random(num_listings)
  reviews_dfs = []
  reviews_pls = []
  for i in range(num_listings):
    data = generate_synthetic_review(restaurant_goodness[i], num_reviews[i])
    reviews_dfs.append(pd.DataFrame(data))
    npl = pl.DataFrame(data)
    npl['index'] = pl.Series('index', list(range(num_reviews[i])))
    reviews_pls.append(npl)
  return reviews_dfs, reviews_pls

# Aspect Ranking Code

In [None]:
# First, we generate a set of test data
reviews_dfs, reviews_pls = generate_synthetic_reviews_dataset(25000)

In [None]:
def pd_calculate_per_aspect_star_rating(reviews):
  star_prior = 3.0
  default_reviews = pd.DataFrame({'primary_topic': review_aspects,
                                  'stars': [star_prior] * len(review_aspects)})
  return pd.concat([reviews[['primary_topic', 'stars']], default_reviews]).groupby('primary_topic').mean().transpose()

pd_calculate_per_aspect_star_rating(reviews_dfs[0])

In [None]:
def pd_get_aspect_ratings_for_listings(pd_reviews_per_listing):
  rating_summaries = []
  for i in range(len(pd_reviews_per_listing)):
    ratings = pd_calculate_per_aspect_star_rating(pd_reviews_per_listing[i])
    rating_summaries.append(ratings)
  return pd.concat(rating_summaries, axis=0)

pd_listing_aspect_ratings = pd_get_aspect_ratings_for_listings(reviews_dfs)
pd_listing_aspect_ratings.head(n=4)

In [None]:
def pd_get_aspect_ranks_speedy(pd_listing_aspect_ratings):
  """Return the rank of this listing globally for a given aspect."""
  # Note - this code is purposefully a bit inefficient to show
  # an outsized slowdown below.
  num_listings = pd_listing_aspect_ratings.shape[0]
  ordered = {}
  for col in pd_listing_aspect_ratings.columns:
    ordered[col] = pd_listing_aspect_ratings[col].sort_values()
  def get_ranks(listing_id):
    listing_ratings = pd_listing_aspect_ratings.iloc[listing_id]
    rv = {}
    for aspect in review_aspects:
      rv[aspect] = [num_listings - np.searchsorted(ordered[aspect], listing_ratings[aspect], side='right')]
    return pd.DataFrame(rv)
  return get_ranks

In [None]:
rank_getter = pd_get_aspect_ranks_speedy(pd_listing_aspect_ratings)
%timeit rank_getter(10)

In [None]:
def get_aspect_rank_descriptions(listing_id, num_listings, get_aspect_ranks_fn):
  dl = get_aspect_ranks_fn(listing_id)
  rank_descriptions = [
      f"#{dl.loc[0, aspect]} out of {num_listings} for {aspect}"
      for aspect in dl.columns]
  return "; ".join(rank_descriptions)

In [None]:
print(get_aspect_rank_descriptions(10,
                                   len(pd_listing_aspect_ratings),
                                   partial(pd_get_aspect_ranks_slower, pd_listing_aspect_ratings)) )

# Benchmarking Polars against Pandas

In [None]:
# Here are two versions of a groupby+head+sort for benchmarking,
# one set in Polars (pl) and another in Pandas (pd)
def pd_get_representative_reviews(df, depth=2):
  return df[df.language == 'en'].sort_values(by=['stars', 'review_age_weeks']).groupby('stars').head(depth).index

print(f"Pandas: Getting representative reviews at 3 depths for {len(reviews_dfs)} listings.")
%time _ = [pd_get_representative_reviews(x, d) for x in reviews_dfs for d in [1,3,9]]

def pl_get_representative_reviews(df, depth=2):
  return df[df.language == 'en'].sort(['stars', 'review_age_weeks']).groupby('stars').head(depth).sort(['stars', 'review_age_weeks']).index

print(f"Polars: Getting representative reviews at 3 depths for {len(reviews_pls)} listings.")
%time _ = [pl_get_representative_reviews(x) for x in reviews_pls for d in [1,3,9]]

On a two-core Google colab instance, `Polars` takes a total of `5.1s` whereas `Pandas` takes a total of `17.9s`.  This is great!  However, on a 12 core workstation we see a *slowdown.*

In [None]:
import psutil

def process_listing(df,
                    listing_id,
                    get_reviews_fn, depth,
                    get_ranks_description_fn):
  rv = {'df': df, 'listing_id': listing_id}
  best_reviews = get_reviews_fn(df, depth)
  ranks_description = get_ranks_description_fn(listing_id)
  rv['cpu_id'] = psutil.Process().cpu_num()
  rv['html'] = (f'Listing {listing_id}:: {ranks_description}\n' +
                f'Be sure to check out reviews {best_reviews}')
  return rv                     
                                
  
def pd_process_listing(df, listing_id, depth, get_ranks_description_fn):
  return process_listing(df, listing_id, pd_get_representative_reviews, depth, get_ranks_description_fn)
      
      
def pl_process_listing(df, listing_id, depth, get_ranks_description_fn):
  return process_listing(df, listing_id, pl_get_representative_reviews, depth, get_ranks_description_fn)


bmsize=25000                    
pandas_reviews, polars_reviews = generate_synthetic_reviews_dataset(bmsize)
  
# This aspect rank finder is somewhat memory intensive using about 1MB 
# of precomputed data as reference to demonstrate an interesting problem
# with multi-core machines.
pd_listing_aspect_ratings = pd_get_aspect_ratings_for_listings(pandas_reviews)
fast_ranker = pd_get_aspect_ranks_speedy(pd_listing_aspect_ratings)


def noop_description_maker(listing_id):
  return "This place is fast!"


def fast_rank_description_maker(listing_id):
  return get_aspect_rank_descriptions(listing_id, bmsize, fast_ranker)


def cpu_history(processed_listings):
  # a is an array of cpu ids in (presumably) program order.
  a = [r["cpu_id"] for r in processed_listings]
  num_core_changes = ((a - np.roll(a,1) ) != 0.).astype(int).sum()
  return f"Over {len(a)} records, computation jumped core at least {num_core_changes} times."

 On a 2 core public Google colab instance, the Polars version *just runs faster*.  However, try loading the same code onto a monster multi-core machine....

In [None]:
import timeit
for (label, postprocessing) in [('no post processing', noop_description_maker),
                                ('rank finding', fast_rank_description_maker),
                                ]:
  print(f'Pandas finding representative reviews for {bmsize} synthetic listings + {label}')
  %time pandas_results = [process_listing(df, listing_id, pd_get_representative_reviews, d, postprocessing) for (listing_id, df) in enumerate(pandas_reviews) for d in [1,3,9]]

print(f"CPU History: {cpu_history(pandas_results)}")

for (label, postprocessing) in [('no post processing', noop_description_maker),
                                ('rank finding', fast_rank_description_maker)
                               ]:
  print(f'Polars finding representative reviews for {bmsize} synthetic listings + {label}')
  %time polars_results = [process_listing(df, listing_id, pl_get_representative_reviews, d, postprocessing) for (listing_id, df) in enumerate(polars_reviews) for d in [1,3,9]]

print(f"CPU History: {cpu_history(polars_results)}")