# GPU-Accelerated Data Processing with RAPIDS

## What We're Building

A complete GPU-accelerated data science workflow using NVIDIA RAPIDS on Clore.ai. Process terabytes of data, train machine learning models, and run complex analytics at 10-100x the speed of traditional CPU-based tools — all with a familiar pandas/scikit-learn API.

**Key Features:**

* cuDF: GPU DataFrame library (pandas API compatible)
* cuML: GPU machine learning (scikit-learn API compatible)
* cuGraph: GPU graph analytics
* Dask-cuDF: Multi-GPU distributed processing
* Automatic GPU provisioning via Clore.ai API
* Jupyter notebook support
* Cost-optimized spot instance usage

## Prerequisites

* Clore.ai account with API key ([get one here](https://clore.ai))
* Python 3.10+
* Basic pandas/scikit-learn knowledge

```bash
pip install requests paramiko scp jupyter
```

## Architecture Overview

```
┌─────────────────────────────────────────────────────────────┐
│                    NVIDIA RAPIDS Stack                       │
├─────────────────────────────────────────────────────────────┤
│  cuDF        │  cuML        │  cuGraph    │  cuSpatial      │
│  DataFrames  │  ML Models   │  Graph      │  Geospatial     │
├─────────────────────────────────────────────────────────────┤
│                        Dask-cuDF                             │
│                  (Multi-GPU Distribution)                    │
├─────────────────────────────────────────────────────────────┤
│                      CUDA / cuPy                             │
├─────────────────────────────────────────────────────────────┤
│                    Clore.ai GPU Server                       │
│               (RTX 4090 / A100 / A6000)                     │
└─────────────────────────────────────────────────────────────┘
```

## Step 1: Clore.ai RAPIDS Client

```python
# clore_rapids_client.py
import requests
import time
import secrets
from typing import Dict, Any, List, Optional
from dataclasses import dataclass

@dataclass
class RAPIDSServer:
    """Represents a RAPIDS-enabled GPU server."""
    server_id: int
    order_id: int
    ssh_host: str
    ssh_port: int
    ssh_password: str
    jupyter_url: str
    gpu_model: str
    gpu_memory_gb: int
    hourly_cost: float


class CloreRAPIDSClient:
    """Clore.ai client for RAPIDS data science workloads."""
    
    BASE_URL = "https://api.clore.ai"
    
    # RAPIDS Docker image
    RAPIDS_IMAGE = "rapidsai/rapidsai-core:24.02-cuda12.0-runtime-ubuntu22.04-py3.10"
    
    # GPU VRAM requirements for RAPIDS
    GPU_VRAM = {
        "A100-80GB": 80, "A100": 80, "A100-40GB": 40,
        "A6000": 48, "RTX 4090": 24, "RTX 3090": 24,
        "A5000": 24, "RTX A4000": 16, "RTX 4080": 16,
        "RTX 3080 Ti": 12, "RTX 3080": 10
    }
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
    
    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make API request with retry logic."""
        url = f"{self.BASE_URL}{endpoint}"
        
        for attempt in range(3):
            try:
                response = requests.request(
                    method, url,
                    headers=self.headers,
                    timeout=30,
                    **kwargs
                )
                data = response.json()
                
                if data.get("code") == 5:
                    time.sleep(2 ** attempt)
                    continue
                
                if data.get("code") != 0:
                    raise Exception(f"API Error: {data}")
                return data
                
            except requests.exceptions.Timeout:
                if attempt == 2:
                    raise
                time.sleep(1)
        
        raise Exception("Max retries exceeded")
    
    def find_rapids_gpu(self,
                        min_vram_gb: int = 16,
                        max_price_usd: float = 0.80,
                        prefer_spot: bool = True) -> Optional[Dict]:
        """Find GPU suitable for RAPIDS workloads."""
        
        servers = self._request("GET", "/v1/marketplace")["servers"]
        
        candidates = []
        for server in servers:
            if server.get("rented"):
                continue
            
            gpu_array = server.get("gpu_array", [])
            
            # Find matching GPU with sufficient VRAM
            gpu_match = None
            for gpu in gpu_array:
                for gpu_name, vram in self.GPU_VRAM.items():
                    if gpu_name in gpu and vram >= min_vram_gb:
                        gpu_match = (gpu_name, vram)
                        break
                if gpu_match:
                    break
            
            if not gpu_match:
                continue
            
            price_data = server.get("price", {}).get("usd", {})
            price = price_data.get("spot" if prefer_spot else "on_demand_clore")
            
            if not price or price > max_price_usd:
                continue
            
            candidates.append({
                "id": server["id"],
                "gpus": gpu_array,
                "gpu_count": len(gpu_array),
                "gpu_model": gpu_match[0],
                "vram_gb": gpu_match[1],
                "total_vram_gb": gpu_match[1] * len(gpu_array),
                "price_usd": price,
                "reliability": server.get("reliability", 0)
            })
        
        if not candidates:
            return None
        
        # Sort by VRAM per dollar
        candidates.sort(key=lambda x: (-x["total_vram_gb"] / x["price_usd"], -x["reliability"]))
        return candidates[0]
    
    def rent_rapids_server(self,
                           server: Dict,
                           use_spot: bool = True,
                           jupyter_token: str = None) -> RAPIDSServer:
        """Rent a server for RAPIDS workloads."""
        
        ssh_password = secrets.token_urlsafe(16)
        jupyter_token = jupyter_token or secrets.token_urlsafe(16)
        
        order_data = {
            "renting_server": server["id"],
            "type": "spot" if use_spot else "on-demand",
            "currency": "CLORE-Blockchain",
            "image": self.RAPIDS_IMAGE,
            "ports": {"22": "tcp", "8888": "http", "8787": "http"},
            "env": {
                "NVIDIA_VISIBLE_DEVICES": "all",
                "JUPYTER_TOKEN": jupyter_token,
                "RAPIDS_NO_INITIALIZE": "1"
            },
            "ssh_password": ssh_password,
            "jupyter_token": jupyter_token
        }
        
        if use_spot:
            order_data["spotprice"] = server["price_usd"] * 1.15
        
        result = self._request("POST", "/v1/create_order", json=order_data)
        order_id = result["order_id"]
        
        # Wait for server
        print(f"Waiting for RAPIDS server {server['id']}...")
        for _ in range(120):
            orders = self._request("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == order_id), None)
            
            if order and order.get("status") == "running":
                conn = order["connection"]
                ssh_parts = conn["ssh"].split()
                ssh_host = ssh_parts[1].split("@")[1] if "@" in ssh_parts[1] else ssh_parts[1]
                ssh_port = int(ssh_parts[-1]) if "-p" in conn["ssh"] else 22
                
                jupyter_url = conn.get("jupyter", f"http://{ssh_host}:8888/?token={jupyter_token}")
                
                return RAPIDSServer(
                    server_id=server["id"],
                    order_id=order_id,
                    ssh_host=ssh_host,
                    ssh_port=ssh_port,
                    ssh_password=ssh_password,
                    jupyter_url=jupyter_url,
                    gpu_model=server["gpu_model"],
                    gpu_memory_gb=server["total_vram_gb"],
                    hourly_cost=server["price_usd"]
                )
            
            time.sleep(2)
        
        raise Exception("Timeout waiting for server")
    
    def cancel_order(self, order_id: int):
        """Cancel a rental order."""
        self._request("POST", "/v1/cancel_order", json={"id": order_id})
```

## Step 2: RAPIDS Data Science Engine

```python
# rapids_engine.py
import paramiko
from scp import SCPClient
import json
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass

@dataclass
class AnalyticsResult:
    """Results from analytics operation."""
    operation: str
    execution_time_seconds: float
    rows_processed: int
    gpu_memory_used_gb: float
    result_data: Dict
    success: bool
    error: Optional[str] = None


class RAPIDSEngine:
    """Execute RAPIDS operations on remote GPU server."""
    
    def __init__(self, ssh_host: str, ssh_port: int, ssh_password: str):
        self.ssh_host = ssh_host
        self.ssh_port = ssh_port
        self.ssh_password = ssh_password
        self._ssh = None
        self._scp = None
    
    def connect(self):
        """Establish SSH connection."""
        self._ssh = paramiko.SSHClient()
        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._ssh.connect(
            self.ssh_host,
            port=self.ssh_port,
            username="root",
            password=self.ssh_password,
            timeout=30
        )
        self._scp = SCPClient(self._ssh.get_transport())
    
    def disconnect(self):
        """Close connections."""
        if self._scp:
            self._scp.close()
        if self._ssh:
            self._ssh.close()
    
    def _exec(self, cmd: str, timeout: int = 3600) -> tuple:
        """Execute command on server."""
        stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
        exit_code = stdout.channel.recv_exit_status()
        return stdout.read().decode(), stderr.read().decode(), exit_code
    
    def upload_file(self, local_path: str, remote_path: str):
        """Upload file to server."""
        self._scp.put(local_path, remote_path)
    
    def download_file(self, remote_path: str, local_path: str):
        """Download file from server."""
        self._scp.get(remote_path, local_path)
    
    def verify_rapids(self) -> Dict:
        """Verify RAPIDS installation and GPU."""
        script = '''
import cudf
import cuml
import rmm
import subprocess

gpu_info = subprocess.check_output(
    ['nvidia-smi', '--query-gpu=name,memory.total,memory.free', '--format=csv,noheader']
).decode().strip()

print(f"RAPIDS_VERSION:{cudf.__version__}")
print(f"GPU_INFO:{gpu_info}")
'''
        
        self._exec(f"cat > /tmp/check.py << 'EOF'\n{script}\nEOF")
        out, err, code = self._exec("python3 /tmp/check.py")
        
        result = {"rapids_ready": code == 0}
        for line in out.strip().split("\n"):
            if ":" in line:
                key, val = line.split(":", 1)
                result[key.lower()] = val
        
        return result
    
    def run_analytics(self, script: str, description: str = "Analytics") -> AnalyticsResult:
        """Run a RAPIDS analytics script."""
        
        wrapped_script = f'''
import time
import json
import cudf
import cuml
import rmm

# Enable managed memory for large datasets
rmm.reinitialize(managed_memory=True)

start_time = time.time()
result = {{"success": True, "error": None, "data": {{}}}}

try:
{self._indent(script, 4)}
except Exception as e:
    result["success"] = False
    result["error"] = str(e)

result["execution_time"] = time.time() - start_time

# Get memory usage
import subprocess
mem_out = subprocess.check_output(
    ["nvidia-smi", "--query-gpu=memory.used", "--format=csv,noheader,nounits"]
).decode()
result["gpu_memory_mb"] = int(mem_out.strip().split("\\n")[0])

print("RESULT:" + json.dumps(result))
'''
        
        self._exec(f"cat > /tmp/analytics.py << 'EOF'\n{wrapped_script}\nEOF")
        out, err, code = self._exec("python3 /tmp/analytics.py", timeout=3600)
        
        for line in out.strip().split("\n"):
            if line.startswith("RESULT:"):
                data = json.loads(line[7:])
                return AnalyticsResult(
                    operation=description,
                    execution_time_seconds=data.get("execution_time", 0),
                    rows_processed=data.get("rows_processed", 0),
                    gpu_memory_used_gb=data.get("gpu_memory_mb", 0) / 1024,
                    result_data=data.get("data", {}),
                    success=data.get("success", False),
                    error=data.get("error")
                )
        
        return AnalyticsResult(
            operation=description,
            execution_time_seconds=0,
            rows_processed=0,
            gpu_memory_used_gb=0,
            result_data={},
            success=False,
            error=err or "Unknown error"
        )
    
    def _indent(self, s: str, n: int = 4) -> str:
        return "\n".join(" " * n + line for line in s.split("\n"))
    
    # --- cuDF Operations ---
    
    def load_csv(self, file_path: str) -> AnalyticsResult:
        """Load CSV file using cuDF."""
        script = f'''
df = cudf.read_csv("{file_path}")
result["rows_processed"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["shape"] = list(df.shape)
result["data"]["dtypes"] = {{str(k): str(v) for k, v in df.dtypes.items()}}
result["data"]["memory_mb"] = df.memory_usage(deep=True).sum() / (1024**2)
'''
        return self.run_analytics(script, "Load CSV")
    
    def load_parquet(self, file_path: str) -> AnalyticsResult:
        """Load Parquet file using cuDF."""
        script = f'''
df = cudf.read_parquet("{file_path}")
result["rows_processed"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["shape"] = list(df.shape)
result["data"]["memory_mb"] = df.memory_usage(deep=True).sum() / (1024**2)
'''
        return self.run_analytics(script, "Load Parquet")
    
    def describe_data(self, file_path: str) -> AnalyticsResult:
        """Get statistical description of data."""
        script = f'''
df = cudf.read_parquet("{file_path}") if "{file_path}".endswith(".parquet") else cudf.read_csv("{file_path}")
result["rows_processed"] = len(df)

# Get describe for numeric columns
desc = df.describe().to_pandas().to_dict()
result["data"]["describe"] = desc
result["data"]["null_counts"] = df.isnull().sum().to_pandas().to_dict()
'''
        return self.run_analytics(script, "Describe Data")
    
    def filter_data(self, input_path: str, output_path: str, condition: str) -> AnalyticsResult:
        """Filter data by condition."""
        script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
initial_rows = len(df)

df = df.query("{condition}")
result["rows_processed"] = initial_rows
result["data"]["input_rows"] = initial_rows
result["data"]["output_rows"] = len(df)
result["data"]["filtered_percent"] = (1 - len(df) / initial_rows) * 100

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"Filter: {condition}")
    
    def groupby_aggregate(self, 
                          input_path: str, 
                          output_path: str,
                          group_cols: List[str],
                          agg_dict: Dict[str, str]) -> AnalyticsResult:
        """Group by and aggregate."""
        group_str = ", ".join(f'"{c}"' for c in group_cols)
        agg_str = ", ".join(f'"{k}": "{v}"' for k, v in agg_dict.items())
        
        script = f'''
df = cudf.read_parquet("{input_path}") if "{input_path}".endswith(".parquet") else cudf.read_csv("{input_path}")
result["data"]["input_rows"] = len(df)

df_agg = df.groupby([{group_str}]).agg({{{agg_str}}}).reset_index()
result["rows_processed"] = len(df)
result["data"]["output_rows"] = len(df_agg)
result["data"]["groups"] = len(df_agg)

df_agg.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"GroupBy {group_cols}")
    
    def join_data(self,
                  left_path: str,
                  right_path: str,
                  output_path: str,
                  on: str,
                  how: str = "inner") -> AnalyticsResult:
        """Join two datasets."""
        script = f'''
left = cudf.read_parquet("{left_path}") if "{left_path}".endswith(".parquet") else cudf.read_csv("{left_path}")
right = cudf.read_parquet("{right_path}") if "{right_path}".endswith(".parquet") else cudf.read_csv("{right_path}")

result["data"]["left_rows"] = len(left)
result["data"]["right_rows"] = len(right)
result["rows_processed"] = len(left) + len(right)

merged = left.merge(right, on="{on}", how="{how}")
result["data"]["output_rows"] = len(merged)

merged.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"Join on {on}")
    
    # --- cuML Operations ---
    
    def train_linear_regression(self,
                                data_path: str,
                                target_col: str,
                                feature_cols: List[str],
                                model_path: str) -> AnalyticsResult:
        """Train linear regression model."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.linear_model import LinearRegression
import pickle

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]]
y = df["{target_col}"]

model = LinearRegression()
model.fit(X, y)

result["data"]["r2_score"] = float(model.score(X, y))
result["data"]["coefficients"] = model.coef_.tolist()
result["data"]["intercept"] = float(model.intercept_)

# Save model
with open("{model_path}", "wb") as f:
    pickle.dump(model, f)
'''
        return self.run_analytics(script, "Train Linear Regression")
    
    def train_random_forest(self,
                            data_path: str,
                            target_col: str,
                            feature_cols: List[str],
                            model_path: str,
                            n_estimators: int = 100,
                            max_depth: int = 16) -> AnalyticsResult:
        """Train random forest classifier/regressor."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.ensemble import RandomForestClassifier
import pickle
import numpy as np

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]].astype("float32")
y = df["{target_col}"].astype("int32")

model = RandomForestClassifier(
    n_estimators={n_estimators},
    max_depth={max_depth},
    random_state=42
)
model.fit(X, y)

accuracy = float(model.score(X, y))
result["data"]["accuracy"] = accuracy
result["data"]["n_estimators"] = {n_estimators}
result["data"]["max_depth"] = {max_depth}

with open("{model_path}", "wb") as f:
    pickle.dump(model, f)
'''
        return self.run_analytics(script, "Train Random Forest")
    
    def train_kmeans(self,
                     data_path: str,
                     feature_cols: List[str],
                     n_clusters: int,
                     output_path: str) -> AnalyticsResult:
        """Train K-Means clustering."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.cluster import KMeans

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]].astype("float32")

kmeans = KMeans(n_clusters={n_clusters}, random_state=42)
labels = kmeans.fit_predict(X)

df["cluster"] = labels
result["data"]["n_clusters"] = {n_clusters}
result["data"]["inertia"] = float(kmeans.inertia_)
result["data"]["cluster_sizes"] = df["cluster"].value_counts().to_pandas().to_dict()

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"KMeans ({n_clusters} clusters)")
    
    def train_pca(self,
                  data_path: str,
                  feature_cols: List[str],
                  n_components: int,
                  output_path: str) -> AnalyticsResult:
        """Perform PCA dimensionality reduction."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
from cuml.decomposition import PCA

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

X = df[[{features_str}]].astype("float32")

pca = PCA(n_components={n_components})
X_transformed = pca.fit_transform(X)

# Create output dataframe
for i in range({n_components}):
    df[f"PC{{i+1}}"] = X_transformed[:, i]

result["data"]["n_components"] = {n_components}
result["data"]["explained_variance_ratio"] = pca.explained_variance_ratio_.tolist()
result["data"]["total_variance_explained"] = sum(pca.explained_variance_ratio_.tolist())

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, f"PCA ({n_components} components)")
    
    def predict(self,
                model_path: str,
                data_path: str,
                feature_cols: List[str],
                output_path: str) -> AnalyticsResult:
        """Make predictions using saved model."""
        features_str = ", ".join(f'"{c}"' for c in feature_cols)
        
        script = f'''
import pickle

df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["rows_processed"] = len(df)

with open("{model_path}", "rb") as f:
    model = pickle.load(f)

X = df[[{features_str}]]
predictions = model.predict(X)

df["prediction"] = predictions
result["data"]["prediction_count"] = len(predictions)

df.to_parquet("{output_path}")
'''
        return self.run_analytics(script, "Predict")
```

## Step 3: Complete Data Science Pipeline

```python
# rapids_pipeline.py
import os
import time
from typing import List, Dict, Optional
from dataclasses import dataclass

from clore_rapids_client import CloreRAPIDSClient, RAPIDSServer
from rapids_engine import RAPIDSEngine, AnalyticsResult

@dataclass
class PipelineStats:
    """Statistics for the pipeline."""
    total_operations: int
    successful_operations: int
    total_rows_processed: int
    total_time_seconds: float
    total_cost_usd: float
    throughput_rows_per_sec: float


class RAPIDSPipeline:
    """High-level data science pipeline using RAPIDS."""
    
    def __init__(self, api_key: str):
        self.client = CloreRAPIDSClient(api_key)
        self.server: RAPIDSServer = None
        self.engine: RAPIDSEngine = None
        self.results: List[AnalyticsResult] = []
    
    def setup(self, 
              min_vram_gb: int = 16,
              max_price_usd: float = 0.80):
        """Provision RAPIDS server."""
        
        print("🔍 Finding RAPIDS GPU...")
        gpu = self.client.find_rapids_gpu(
            min_vram_gb=min_vram_gb,
            max_price_usd=max_price_usd
        )
        
        if not gpu:
            raise Exception(f"No GPU with {min_vram_gb}GB+ under ${max_price_usd}/hr")
        
        print(f"   {gpu['gpu_model']} ({gpu['total_vram_gb']}GB) @ ${gpu['price_usd']:.2f}/hr")
        
        print("🚀 Provisioning server...")
        self.server = self.client.rent_rapids_server(gpu)
        
        print(f"   SSH: {self.server.ssh_host}:{self.server.ssh_port}")
        print(f"   Jupyter: {self.server.jupyter_url}")
        
        # Connect engine
        self.engine = RAPIDSEngine(
            self.server.ssh_host,
            self.server.ssh_port,
            self.server.ssh_password
        )
        self.engine.connect()
        
        # Verify
        print("🔧 Verifying RAPIDS...")
        info = self.engine.verify_rapids()
        print(f"   Version: {info.get('rapids_version', 'N/A')}")
        
        return self
    
    def upload_data(self, local_path: str, remote_path: str = None) -> str:
        """Upload data file."""
        if remote_path is None:
            remote_path = f"/tmp/data/{os.path.basename(local_path)}"
        
        self.engine._exec(f"mkdir -p {os.path.dirname(remote_path)}")
        print(f"📤 Uploading {os.path.basename(local_path)}...")
        self.engine.upload_file(local_path, remote_path)
        return remote_path
    
    def download_data(self, remote_path: str, local_path: str):
        """Download data file."""
        print(f"📥 Downloading {os.path.basename(remote_path)}...")
        os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True)
        self.engine.download_file(remote_path, local_path)
    
    def run(self, script: str, description: str = "Custom Operation") -> AnalyticsResult:
        """Run custom RAPIDS script."""
        result = self.engine.run_analytics(script, description)
        self.results.append(result)
        
        if result.success:
            print(f"   ✅ {description}: {result.rows_processed:,} rows in {result.execution_time_seconds:.2f}s")
        else:
            print(f"   ❌ {description}: {result.error}")
        
        return result
    
    def load(self, file_path: str) -> AnalyticsResult:
        """Load data file."""
        if file_path.endswith(".parquet"):
            result = self.engine.load_parquet(file_path)
        else:
            result = self.engine.load_csv(file_path)
        self.results.append(result)
        return result
    
    def describe(self, file_path: str) -> AnalyticsResult:
        """Get data statistics."""
        result = self.engine.describe_data(file_path)
        self.results.append(result)
        return result
    
    def filter(self, input_path: str, output_path: str, condition: str) -> AnalyticsResult:
        """Filter data."""
        result = self.engine.filter_data(input_path, output_path, condition)
        self.results.append(result)
        return result
    
    def groupby(self, input_path: str, output_path: str, 
                group_cols: List[str], aggs: Dict[str, str]) -> AnalyticsResult:
        """Group by and aggregate."""
        result = self.engine.groupby_aggregate(input_path, output_path, group_cols, aggs)
        self.results.append(result)
        return result
    
    def join(self, left: str, right: str, output: str, on: str, how: str = "inner") -> AnalyticsResult:
        """Join datasets."""
        result = self.engine.join_data(left, right, output, on, how)
        self.results.append(result)
        return result
    
    def train_model(self, 
                    model_type: str,
                    data_path: str,
                    target: str,
                    features: List[str],
                    model_path: str,
                    **kwargs) -> AnalyticsResult:
        """Train ML model."""
        if model_type == "linear_regression":
            result = self.engine.train_linear_regression(data_path, target, features, model_path)
        elif model_type == "random_forest":
            result = self.engine.train_random_forest(
                data_path, target, features, model_path,
                n_estimators=kwargs.get("n_estimators", 100),
                max_depth=kwargs.get("max_depth", 16)
            )
        else:
            raise ValueError(f"Unknown model type: {model_type}")
        
        self.results.append(result)
        return result
    
    def cluster(self, data_path: str, features: List[str], 
                n_clusters: int, output_path: str) -> AnalyticsResult:
        """K-Means clustering."""
        result = self.engine.train_kmeans(data_path, features, n_clusters, output_path)
        self.results.append(result)
        return result
    
    def pca(self, data_path: str, features: List[str],
            n_components: int, output_path: str) -> AnalyticsResult:
        """PCA dimensionality reduction."""
        result = self.engine.train_pca(data_path, features, n_components, output_path)
        self.results.append(result)
        return result
    
    def predict(self, model_path: str, data_path: str,
                features: List[str], output_path: str) -> AnalyticsResult:
        """Make predictions."""
        result = self.engine.predict(model_path, data_path, features, output_path)
        self.results.append(result)
        return result
    
    def get_stats(self) -> PipelineStats:
        """Get pipeline statistics."""
        successful = [r for r in self.results if r.success]
        total_rows = sum(r.rows_processed for r in successful)
        total_time = sum(r.execution_time_seconds for r in self.results)
        
        return PipelineStats(
            total_operations=len(self.results),
            successful_operations=len(successful),
            total_rows_processed=total_rows,
            total_time_seconds=total_time,
            total_cost_usd=(total_time / 3600) * self.server.hourly_cost if self.server else 0,
            throughput_rows_per_sec=total_rows / total_time if total_time > 0 else 0
        )
    
    def cleanup(self):
        """Release resources."""
        if self.engine:
            self.engine.disconnect()
        if self.server:
            print("🧹 Releasing server...")
            self.client.cancel_order(self.server.order_id)
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()
```

## Full Script: End-to-End Data Science

```python
#!/usr/bin/env python3
"""
RAPIDS Data Science Pipeline on Clore.ai.

Usage:
    python rapids_datascience.py --api-key YOUR_API_KEY --data dataset.parquet \
        --operation describe|cluster|train|predict
"""

import os
import sys
import time
import json
import argparse
import secrets
import requests
import paramiko
from scp import SCPClient
from typing import List, Dict


class CloreRAPIDSDataScience:
    """Complete RAPIDS data science solution on Clore.ai."""
    
    BASE_URL = "https://api.clore.ai"
    RAPIDS_IMAGE = "rapidsai/rapidsai-core:24.02-cuda12.0-runtime-ubuntu22.04-py3.10"
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {"auth": api_key}
        self.order_id = None
        self.ssh_host = None
        self.ssh_port = None
        self.ssh_password = None
        self.hourly_cost = 0.0
        self._ssh = None
        self._scp = None
    
    def _api(self, method: str, endpoint: str, **kwargs) -> Dict:
        url = f"{self.BASE_URL}{endpoint}"
        for attempt in range(3):
            response = requests.request(method, url, headers=self.headers, **kwargs)
            data = response.json()
            if data.get("code") == 5:
                time.sleep(2 ** attempt)
                continue
            if data.get("code") != 0:
                raise Exception(f"API Error: {data}")
            return data
        raise Exception("Max retries")
    
    def setup(self, min_vram: int = 16, max_price: float = 0.80):
        print("🔍 Finding RAPIDS GPU...")
        servers = self._api("GET", "/v1/marketplace")["servers"]
        
        gpu_vram = {"A100": 80, "RTX 4090": 24, "RTX 3090": 24, "A6000": 48}
        
        candidates = []
        for s in servers:
            if s.get("rented"):
                continue
            gpus = s.get("gpu_array", [])
            
            match = None
            for gpu in gpus:
                for g, vram in gpu_vram.items():
                    if g in gpu and vram >= min_vram:
                        match = (g, vram)
                        break
            
            if not match:
                continue
            
            price = s.get("price", {}).get("usd", {}).get("spot")
            if price and price <= max_price:
                candidates.append({
                    "id": s["id"], "gpu": match[0], "vram": match[1],
                    "price": price
                })
        
        if not candidates:
            raise Exception(f"No GPU with {min_vram}GB+ under ${max_price}/hr")
        
        gpu = max(candidates, key=lambda x: x["vram"] / x["price"])
        print(f"   {gpu['gpu']} ({gpu['vram']}GB) @ ${gpu['price']:.2f}/hr")
        
        self.ssh_password = secrets.token_urlsafe(16)
        self.hourly_cost = gpu["price"]
        
        print("🚀 Provisioning...")
        order_data = {
            "renting_server": gpu["id"],
            "type": "spot",
            "currency": "CLORE-Blockchain",
            "image": self.RAPIDS_IMAGE,
            "ports": {"22": "tcp", "8888": "http"},
            "env": {"NVIDIA_VISIBLE_DEVICES": "all"},
            "ssh_password": self.ssh_password,
            "spotprice": gpu["price"] * 1.15
        }
        
        result = self._api("POST", "/v1/create_order", json=order_data)
        self.order_id = result["order_id"]
        
        print("⏳ Waiting...")
        for _ in range(120):
            orders = self._api("GET", "/v1/my_orders")["orders"]
            order = next((o for o in orders if o["order_id"] == self.order_id), None)
            if order and order.get("status") == "running":
                conn = order["connection"]["ssh"]
                parts = conn.split()
                self.ssh_host = parts[1].split("@")[1]
                self.ssh_port = int(parts[-1]) if "-p" in conn else 22
                break
            time.sleep(2)
        else:
            raise Exception("Timeout")
        
        self._ssh = paramiko.SSHClient()
        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._ssh.connect(self.ssh_host, port=self.ssh_port,
                          username="root", password=self.ssh_password, timeout=30)
        self._scp = SCPClient(self._ssh.get_transport())
        
        print(f"✅ Ready: {self.ssh_host}:{self.ssh_port}")
    
    def _exec(self, cmd: str, timeout: int = 3600) -> str:
        stdin, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout)
        stdout.channel.recv_exit_status()
        return stdout.read().decode()
    
    def upload(self, local: str) -> str:
        self._exec("mkdir -p /tmp/data /tmp/output /tmp/models")
        remote = f"/tmp/data/{os.path.basename(local)}"
        print(f"📤 Uploading {os.path.basename(local)}...")
        self._scp.put(local, remote)
        return remote
    
    def download(self, remote: str, local: str):
        os.makedirs(os.path.dirname(local) or ".", exist_ok=True)
        self._scp.get(remote, local)
    
    def run_script(self, script: str) -> Dict:
        wrapped = f'''
import cudf, cuml, json, time, rmm
rmm.reinitialize(managed_memory=True)
start = time.time()
result = {{"success": True, "data": {{}}}}
try:
{self._indent(script)}
except Exception as e:
    result["success"] = False
    result["error"] = str(e)
result["time"] = time.time() - start
print("OUT:" + json.dumps(result))
'''
        self._exec(f"cat > /tmp/run.py << 'EOF'\n{wrapped}\nEOF")
        out = self._exec("python3 /tmp/run.py")
        
        for line in out.split("\n"):
            if line.startswith("OUT:"):
                return json.loads(line[4:])
        return {"success": False, "error": "Parse error"}
    
    def _indent(self, s: str, n: int = 4) -> str:
        return "\n".join(" " * n + line for line in s.split("\n"))
    
    def describe(self, data_path: str) -> Dict:
        script = f'''
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
result["data"]["rows"] = len(df)
result["data"]["columns"] = list(df.columns)
result["data"]["dtypes"] = {{str(k): str(v) for k, v in df.dtypes.items()}}
desc = df.describe().to_pandas().to_dict()
result["data"]["describe"] = desc
'''
        print("📊 Describing data...")
        return self.run_script(script)
    
    def cluster(self, data_path: str, features: List[str], n_clusters: int, output: str) -> Dict:
        features_str = ", ".join(f'"{f}"' for f in features)
        script = f'''
from cuml.cluster import KMeans
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
X = df[[{features_str}]].astype("float32")
kmeans = KMeans(n_clusters={n_clusters}, random_state=42)
df["cluster"] = kmeans.fit_predict(X)
result["data"]["inertia"] = float(kmeans.inertia_)
result["data"]["cluster_sizes"] = df["cluster"].value_counts().to_pandas().to_dict()
df.to_parquet("{output}")
'''
        print(f"🔮 Clustering ({n_clusters} clusters)...")
        return self.run_script(script)
    
    def train_rf(self, data_path: str, target: str, features: List[str], 
                 model_path: str, n_estimators: int = 100) -> Dict:
        features_str = ", ".join(f'"{f}"' for f in features)
        script = f'''
from cuml.ensemble import RandomForestClassifier
import pickle
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
X = df[[{features_str}]].astype("float32")
y = df["{target}"].astype("int32")
model = RandomForestClassifier(n_estimators={n_estimators}, max_depth=16, random_state=42)
model.fit(X, y)
result["data"]["accuracy"] = float(model.score(X, y))
with open("{model_path}", "wb") as f:
    pickle.dump(model, f)
'''
        print(f"🌲 Training Random Forest ({n_estimators} trees)...")
        return self.run_script(script)
    
    def predict(self, model_path: str, data_path: str, features: List[str], output: str) -> Dict:
        features_str = ", ".join(f'"{f}"' for f in features)
        script = f'''
import pickle
df = cudf.read_parquet("{data_path}") if "{data_path}".endswith(".parquet") else cudf.read_csv("{data_path}")
with open("{model_path}", "rb") as f:
    model = pickle.load(f)
X = df[[{features_str}]]
df["prediction"] = model.predict(X)
result["data"]["predictions"] = len(df)
df.to_parquet("{output}")
'''
        print("🎯 Making predictions...")
        return self.run_script(script)
    
    def cleanup(self):
        if self._scp:
            self._scp.close()
        if self._ssh:
            self._ssh.close()
        if self.order_id:
            print("🧹 Releasing...")
            self._api("POST", "/v1/cancel_order", json={"id": self.order_id})
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.cleanup()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--api-key", required=True)
    parser.add_argument("--data", required=True)
    parser.add_argument("--operation", choices=["describe", "cluster", "train", "predict"], required=True)
    parser.add_argument("--features", help="Comma-separated feature columns")
    parser.add_argument("--target", help="Target column for training")
    parser.add_argument("--clusters", type=int, default=5)
    parser.add_argument("--output", default="./output.parquet")
    parser.add_argument("--model", default="/tmp/models/model.pkl")
    parser.add_argument("--min-vram", type=int, default=16)
    parser.add_argument("--max-price", type=float, default=0.80)
    args = parser.parse_args()
    
    start = time.time()
    
    with CloreRAPIDSDataScience(args.api_key) as ds:
        ds.setup(args.min_vram, args.max_price)
        
        # Upload data
        remote_data = ds.upload(args.data)
        
        features = args.features.split(",") if args.features else []
        
        if args.operation == "describe":
            result = ds.describe(remote_data)
            print(f"\n📋 Data Summary:")
            print(f"   Rows: {result['data'].get('rows', 0):,}")
            print(f"   Columns: {result['data'].get('columns', [])}")
            
        elif args.operation == "cluster":
            if not features:
                raise ValueError("--features required for clustering")
            output = f"/tmp/output/{os.path.basename(args.output)}"
            result = ds.cluster(remote_data, features, args.clusters, output)
            ds.download(output, args.output)
            print(f"\n✅ Clustering complete")
            print(f"   Inertia: {result['data'].get('inertia', 0):.2f}")
            print(f"   Output: {args.output}")
            
        elif args.operation == "train":
            if not features or not args.target:
                raise ValueError("--features and --target required for training")
            result = ds.train_rf(remote_data, args.target, features, args.model)
            ds.download(args.model, args.model.split("/")[-1])
            print(f"\n✅ Model trained")
            print(f"   Accuracy: {result['data'].get('accuracy', 0):.4f}")
            
        elif args.operation == "predict":
            if not features:
                raise ValueError("--features required for prediction")
            ds.upload(args.model.split("/")[-1])
            output = f"/tmp/output/{os.path.basename(args.output)}"
            result = ds.predict(args.model, remote_data, features, output)
            ds.download(output, args.output)
            print(f"\n✅ Predictions complete")
            print(f"   Output: {args.output}")
        
        elapsed = time.time() - start
        cost = (elapsed / 3600) * ds.hourly_cost
        print(f"\n⏱️  Time: {elapsed:.1f}s | Cost: ${cost:.4f}")


if __name__ == "__main__":
    main()
```

## Performance Comparison

| Operation            | 100M Rows | scikit-learn | cuML | Speedup |
| -------------------- | --------- | ------------ | ---- | ------- |
| K-Means (8 clusters) | 100M      | 180s         | 3s   | **60x** |
| Random Forest        | 100M      | 300s         | 8s   | **37x** |
| PCA (50 components)  | 100M      | 90s          | 2s   | **45x** |
| Linear Regression    | 100M      | 25s          | 0.5s | **50x** |
| DBSCAN               | 10M       | 600s         | 15s  | **40x** |

## Cost Comparison

| Workload           | Local (CPU) | AWS SageMaker | Clore.ai RAPIDS |
| ------------------ | ----------- | ------------- | --------------- |
| 100M row analysis  | 30 min      | $2.00         | **$0.25**       |
| Model training     | 2 hours     | $8.00         | **$0.80**       |
| Daily ETL pipeline | 4 hours     | $15.00        | **$1.50**       |

## Best Practices

1. **Use Parquet format** — 10x faster than CSV
2. **Enable managed memory** — `rmm.reinitialize(managed_memory=True)`
3. **Use A100/A6000** for large datasets (more VRAM)
4. **Batch operations** — keep data on GPU between transforms
5. **Use spot instances** — 50-70% cheaper for batch jobs

## Next Steps

* [GPU ETL with cuDF](/data-processing-and-pipelines/gpu-etl.md)
* [Video Transcoding](/data-processing-and-pipelines/video-transcoding.md)
* [ML Training on Clore.ai](/machine-learning-and-training/training-scheduler.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://dev.clore.ai/data-processing-and-pipelines/rapids-processing.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
