Source code for salesanalyzer_mds.segment_revenue_share

import pandas as pd


[docs] def segment_revenue_share(sales_data: pd.DataFrame, price_col: str = 'UnitPrice', quantity_col: str = 'Quantity', price_thresholds: tuple = None) -> pd.DataFrame: """ Segments products into three categories—cheap, medium, and expensive— based on price and calculates their respective share in total revenue. Parameters: ----------- sales_data : pd.DataFrame DataFrame containing historical sales data. price_col : str Column containing product prices. Default is 'UnitPrice'. quantity_col : str Column containing quantities sold. Default is 'Quantity'. price_thresholds : tuple, optional User-defined price thresholds (cheap_threshold, expensive_threshold). If None, quantiles (0.33, 0.67) are used. Returns: -------- pd.DataFrame A DataFrame showing the total revenue share for each price segment: 'cheap', 'medium', 'expensive'. Raises: ------- ValueError: If the input DataFrame is empty or specified columns contain missing data. KeyError: If any of the specified columns are missing in the DataFrame. TypeError: If any of the columns contain invalid data types. """ # Check if input dataframe is empty if sales_data.empty: raise ValueError("Input DataFrame is empty.") # Check if required columns exist required_columns = {price_col, quantity_col} missing_columns = required_columns - set(sales_data.columns) if missing_columns: raise KeyError(f"Missing columns in input DataFrame: {missing_columns}") # Check for missing values if sales_data[price_col].isna().any() or sales_data[quantity_col].isna().any(): raise ValueError(f"{price_col} or {quantity_col} column contains missing values.") # Check for valid numeric types if not pd.api.types.is_numeric_dtype(sales_data[price_col]): raise TypeError(f"{price_col} must contain numeric data.") if not pd.api.types.is_numeric_dtype(sales_data[quantity_col]): raise TypeError(f"{quantity_col} must contain numeric data.") # Calculate revenue as price * quantity sales_data = sales_data.assign( Revenue=sales_data[price_col] * sales_data[quantity_col] ) # Determine price price_thresholds if price_thresholds is not None: cheap_threshold, expensive_threshold = price_thresholds else: sorted_prices = sales_data[price_col].sort_values() cheap_threshold = sorted_prices.quantile(0.33) expensive_threshold = sorted_prices.quantile(0.67) # Categorize prices based on price_thresholds def categorize_price(price): if price <= cheap_threshold: return 'cheap' elif price <= expensive_threshold: return 'medium' else: return 'expensive' sales_data = sales_data.assign( PriceSegment=sales_data[price_col].apply(categorize_price) ) # Calculate revenue share for each segment revenue_share = ( sales_data.groupby('PriceSegment')['Revenue'] .sum() .reset_index() .rename(columns={'Revenue': 'TotalRevenue'}) ) total_revenue = revenue_share['TotalRevenue'].sum() # Prevent division by zero revenue_share['RevenueShare (%)'] = ( ((revenue_share['TotalRevenue'] / total_revenue) * 100 if total_revenue > 0 else 0.0) ) # Round values for better readability revenue_share = revenue_share.round({'TotalRevenue': 2, 'RevenueShare (%)': 2}) # Ensure all segments are included, even if they have zero revenue segment_order = ['cheap', 'medium', 'expensive'] revenue_share = (revenue_share.set_index('PriceSegment') .reindex(segment_order, fill_value=0) .reset_index()) return revenue_share