# PipeStream MCP

> PipeStream connects your AI agent directly to high-throughput data streams. You manage event logging and real-time data flow—creating new channels, publishing events instantly, or fetching historical records using natural language commands.

## Overview
- **Category:** developer-tools
- **Price:** Free
- **Tags:** event-streaming, real-time-data, data-pipelines, logging, api-integration

## Description

PipeStream connects your AI agent directly to high-throughput data streams, letting you manage event logging and real-time data flow with simple natural language commands. You don't need a whole separate monitoring suite; your agent handles it.

To start managing the system, remember that ``list_streams`` provides a complete list of every data channel currently running in PipeStream. Beyond just checking what's active, you can set up brand new channels using ``create_stream``, defining both its name and how long it should keep records—you gotta specify that retention policy upfront.

When you need to log something instantly, ``publish_event`` sends a structured JSON payload immediately to any stream ID you point it at. This is what you use for logging critical events or sending data packets that need immediate processing. It’s super fast; you just give it the target and the data.

For historical deep dives, ``fetch_events`` pulls specific records from any existing stream. You don't have to manually sift through gigabytes of garbage; you can filter these past events by a precise date range or use pagination to walk through large datasets chunk by chunk.

Think about the process: first, your agent uses ``list_streams`` to see what channels are already operational. Then, if you need a new data source, it'll run ``create_stream`` to spin up a dedicated channel and set its lifespan policy—say, 90 days of retention. Once those streams are ready, you can immediately inject live activity. To log an event, your agent just calls ``publish_event`` with the stream ID and the JSON payload; it's instant logging for auditing or processing purposes.

If you need to review what happened last Tuesday, instead of searching through a database GUI, you tell your AI client to run ``fetch_events``. You specify the time frame—like 'August 1st to August 5th'—and the system pulls those exact historical records. If there are too many results for one page load, you can guide it using pagination parameters so you get a clean, manageable batch of data every time.

This whole loop means your agent doesn't just read; it actively controls the data flow. You list streams to check status, create streams to build capacity, publish events to write activity, and fetch events to read history. It’s a closed-loop system built right into your chat window. You keep all that visibility without ever leaving your prompt. You're not just looking at logs; you're managing the entire pipeline lifecycle—from creation to real-time injection and deep historical retrieval.

## Tools

### create_stream
Sets up a brand new data channel, allowing you to define its name and how long it should keep records.

### fetch_events
Pulls specific historical events from any existing stream. You can filter this by date range or paginate through results.

### list_streams
Provides a complete list of every data channel currently active in the system.

### publish_event
Sends an immediate, structured event payload to a designated stream ID for logging or processing.

## Prompt Examples

**Prompt:** 
```
List all my active PipeStream channels.
```

**Response:** 
```
I've retrieved your active streams. You have 3 streams: 'user-signups' (ID: st_1), 'system-logs' (ID: st_2), and 'payment-events' (ID: st_3).
```

**Prompt:** 
```
Create a new stream named 'production-alerts' with 48 hours retention.
```

**Response:** 
```
Successfully created the stream 'production-alerts'. It is now active with a 48-hour data retention policy. The assigned Stream ID is st_99.
```

**Prompt:** 
```
Fetch the last 5 events from stream st_1.
```

**Response:** 
```
I've fetched the 5 most recent events from 'user-signups'. They include payload data for user registrations from the last hour. Would you like me to summarize the data?
```

## Capabilities

### Track all active streams
ListStreams shows every data channel currently running in PipeStream.

### Create a new data stream
Use create_stream to set up a dedicated, named channel and define its required retention policy.

### Inject an event into a stream
PublishEvent sends a structured JSON payload instantly to a specified stream ID.

### Get historical events
FetchEvents retrieves past records from any stream, allowing you to filter by time or page through large datasets.

## Use Cases

### Debugging a payment failure
A user notices payments are failing sporadically. They ask their agent to run `list_streams` first, finding the 'payment-events' channel. Next, they tell the agent to use `fetch_events` on that stream for the last hour, filtering by 'failure' status. The resulting payload shows the exact data field missing from the request.

### Launching a new microservice
A backend developer needs a dedicated testing channel for a new service component. They use `create_stream` to make 'testing-svc-v2' and set retention to 7 days. They then run a test payload using `publish_event`, confirming the event lands correctly before merging the code.

### Analyzing system performance
The ops team suspects resource exhaustion. The agent runs `list_streams` to identify all channels, then uses `fetch_events` on 'system-logs', paginating through thousands of records to pinpoint a specific increase in error rates over the last 30 minutes.

### Running an experiment
A data scientist wants to test a new feature's impact. They run `create_stream` for 'experiment-beta', and then use `publish_event` to send structured JSON records simulating the beta traffic, all without touching the production system.

## Benefits

- Stop jumping between dashboards. You can monitor live streams and check event logs using `list_streams` or `fetch_events`—all within one conversation.
- Handle high-volume logging instantly. Use `publish_event` to inject structured JSON payloads into a stream without writing any code or hitting an API endpoint directly.
- Control data lifecycle. When you use `create_stream`, you enforce custom retention policies, ensuring old logs don't bloat storage unnecessarily.
- Speed up incident response. During a critical outage, quickly run `fetch_events` on the relevant stream ID to see exactly what happened and when it occurred.
- Contextual data access. Pull sample or experimental data for analysis by running `fetch_events`, keeping those raw records right in your current working context.

## How It Works

The bottom line is: It lets your agent act as a direct interface to your entire logging and event infrastructure.

1. Subscribe to the PipeStream MCP Server and provide your API key.
2. Reference a tool (like `list_streams`) in your prompt, telling your agent what data you need.
3. Your AI client executes the necessary calls, giving you real-time stream status or historical event payloads directly.

## Frequently Asked Questions

**How do I check if a specific event was successfully logged using PipeStream? (pipe_stream)**
You use `fetch_events`. Simply specify the stream ID and the approximate time window. This returns the payload data, letting you confirm the record exists in the system.

**Can I create a custom log channel with PipeStream? (pipe_stream)**
Yes, run `create_stream`. You must provide a name and specify the retention policy—this defines how long that stream will keep data automatically.

**What should I use to send an event when my service calls it? (pipe_stream)**
Use `publish_event`. This tool takes a JSON payload and sends it immediately to the target stream ID, making sure the data is captured in the log.

**Does PipeStream list all my available channels? (pipe_stream)**
Yes. Run `list_streams`. This gives you a definitive, current view of every active channel name and status within your account.

**How do I authenticate when using the `publish_event` tool?**
You must include your unique PipeStream API key in the request header. This key authorizes your AI client to write data and events to any stream you manage.

**Does `fetch_events` support retrieving data only within a specific date range?**
Yes, you can use time-based filters with the `fetch_events` tool. Just specify the start and end timestamps to narrow down your historical search results efficiently.

**Can I set a data lifespan using the `create_stream` tool?**
Yes. When you run `create_stream`, you define the desired retention policy. This setting controls exactly how long PipeStream keeps event payloads before automatically removing them.

**How do I check the current status or overall health of my data channels?**
The `list_streams` tool shows all active channels, but for real-time monitoring, you can inspect the live flow of data directly in your chat interface. This lets you track activity across all streams without switching tools.

**Can I create a new data stream with a specific retention period?**
Yes! Use the `create_stream` tool. You can specify the name and the `retention_hours` to define how long data should be stored in that logical channel.

**How do I send a JSON payload to an existing stream?**
Use the `publish_event` tool. Provide the `stream_id` and your JSON `payload`. You can also optionally include a custom ISO8601 timestamp.

**Is it possible to filter events by time when fetching data?**
Absolutely. The `fetch_events` tool allows you to provide a `from_timestamp` to retrieve only the events recorded after a specific point in time.