WebSocket API
Overview
Watchers.io provides a WebSocket API based on Socket.IO for integrating bots and external clients. The API allows connecting to chat rooms, receiving messages, and sending responses in real-time.
Connection
Endpoint
wss://bot.watchers.io/
Connection Parameters
const socket = io('wss://bot.watchers.io/', {
transports: ['websocket'],
upgrade: false, // Disable automatic upgrade
auth: {
bearer: 'YOUR_BEARER_TOKEN'
}
});import socketio
sio = socketio.AsyncClient(logger=False, engineio_logger=False)
await sio.connect(
'wss://bot.watchers.io/',
transports=['websocket'],
auth={'bearer': 'YOUR_BEARER_TOKEN'}
)Required Credentials:
bearer- JWT token for connection authorization (passed in auth during connection)apiKey- API key for performing operations (passed in each request)userId- unique identifier of the bot/client
Bearer Token Format:
JWT format: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
Payload contains: { uid, project, iat }
Events
1. System Events
connect
connectTriggered when successfully connected to the server.
socket.on('connect', () => {
console.log('✅ Connected to Watchers.io');
// Subscribe to rooms here
});@sio.event
async def connect():
print('✅ Connected to Watchers.io')
# Subscribe to rooms hereconnect_error
connect_errorTriggered on connection error (invalid bearer token, network issues, etc.).
socket.on('connect_error', (error) => {
console.error('❌ Connection error:', error);
});@sio.event
async def connect_error(err):
print(f'❌ Connection error: {err}')disconnect
disconnectTriggered when connection is lost.
JavaScript:
socket.on('disconnect', () => {
console.log('🔌 Disconnected from server');
});@sio.event
async def disconnect():
print('🔌 Disconnected from server')Subscription Events (Listening)
1. external/listen
external/listenSubscribe to a specific room by its ID.
Payload:
{
roomId: string, // Room ID to subscribe to
apiKey: string // Your API key
}socket.emit("external/listen",
{
roomId: "sports-room-001",
apiKey: "api_9f8e7d6c5b4a3210"
},
(response) => {
console.log('Subscription activated:', response);
}
);await sio.emit(
'external/listen',
{
'roomId': 'sports-room-001',
'apiKey': 'api_9f8e7d6c5b4a3210'
},
callback=lambda r: print(f'Subscription activated: {r}')
)Description:
- Subscribe to a specific room
- After subscription, you will receive all messages from this room
- Used for monitoring specific rooms
2. external/listen/bot
external/listen/botGlobal subscription to all bot mentions across any rooms.
Payload:
{
apiKey: string // Your API key
}socket.emit("external/listen/bot",
{ apiKey: "api_9f8e7d6c5b4a3210" },
(response) => {
console.log('Global subscription to mentions:', response);
}
);await sio.emit(
'external/listen/bot',
{'apiKey': 'api_9f8e7d6c5b4a3210'},
callback=lambda r: print(f'Global subscription to mentions: {r}')
)Description:
- Does not require specifying a specific room
- Bot receives messages only when mentioned (@botname)
- Works for all public rooms where the bot has access
- The
messageevent will contain abotNamefield
3. external/listen/personal
external/listen/personalGlobal subscription to all personal chats with the bot.
Payload:
{
apiKey: string // Your API key
}socket.emit("external/listen/personal",
{ apiKey: "api_9f8e7d6c5b4a3210" },
(response) => {
console.log('Subscription to personal messages:', response);
}
);await sio.emit(
'external/listen/personal',
{'apiKey': 'api_9f8e7d6c5b4a3210'},
callback=lambda r: print(f'Subscription to personal messages: {r}')
)Description:
- Does not require specifying a specific room
- Bot receives all messages from personal chats
- Rooms have the format
personal.chatbot.{userId} - Bot mention is not required
Incoming Messages
message
messageReceive an incoming message from subscribed rooms.
Data Structure:
{
project: stirng
roomId: string
userId: string
message: {id: number, text: string, sticker?: string}
botName?: string
replyTo?: { id: number, text: string, sticker?: string}
}
}Example:
socket.on('message', (data) => {
console.log('New message:', data);
const { roomId, userId, message, botName } = data;
const text = message?.text || '';
const messageId = message?.id;
// Check room type
const isPersonal = roomId.startsWith('personal.chatbot.');
// Check mention
const wasMentioned = !!botName;
// Process...
});@sio.event
async def message(data):
print(f'New message: {data}')
room_id = data.get('roomId', '')
user_id = data.get('userId', '')
msg = data.get('message', {})
bot_name = data.get('botName')
text = msg.get('text', '').strip()
message_id = msg.get('id')
# Check room type
is_personal = room_id.startswith('personal.chatbot.')
# Check mention
was_mentioned = bool(bot_name)
# Process...Important Notes:
- The
botNamefield is present only when the bot is explicitly mentioned - Ignore messages where
userIdmatches your bot (own messages) - Personal rooms always start with the prefix
personal.chatbot.
Sending Messages
external/send
external/sendSend a message to a room.
Payload:
{
roomId: string, // Room ID
apiKey: string, // Your API key
userId: string, // Your bot ID
text: string, // Message text
mentionMessageId?: number // Message ID to reply to in thread (optional)
}Example:
socket.emit("external/send",
{
roomId: "sports-room-001",
apiKey: "api_9f8e7d6c5b4a3210",
userId: "bot_assistant_xyz",
text: "Hello! This is my response.",
mentionMessageId: 12345 // Reply to specific message
},
(status) => {
console.log('Send status:', status);
}
);await sio.emit(
'external/send',
{
'roomId': 'sports-room-001',
'apiKey': 'api_9f8e7d6c5b4a3210',
'userId': 'bot_assistant_xyz',
'text': 'Hello! This is my response.',
'mentionMessageId': 12345 # Reply to specific message
},
callback=lambda status: print(f'Send status: {status}')
)Parameters:
roomId- required, destination room IDapiKey- required, your API keyuserId- required, your bot IDtext- required, message text to sendmentionMessageId- optional, if specified - creates a reply in thread
Features:
- In personal rooms,
mentionMessageIdis usually not used or passed asnull - In public rooms,
mentionMessageIdallows creating a thread (reply to message) - Callback returns send status (see "Response Handling" section)
Room Types
1. Regular (Public) Rooms
- ID Format: arbitrary string or number (
"sports-room-001","general","123") - Subscription:
external/listen(specific room) orexternal/listen/bot(global) - Response Trigger: bot mention
@botnamerequired (when usinglisten/bot) - Threads: supported via
mentionMessageId
2. Personal Rooms
- ID Format:
personal.chatbot.{userId} - Subscription:
external/listen/personal(global for all personal chats) - Response Trigger: all messages, mention not required
- Threads: usually not used (mentionMessageId = null)
Working with Mentions
Detecting Mentions
A bot is considered mentioned if:
- The
messageevent contains abotNamefield - The message text starts with
@{botName}(case-insensitive)
Cleaning Text from Mentions
function stripBotMention(text, botName) {
if (!botName) return text;
// Remove @botname at the beginning + separators (spaces, colons, etc.)
const pattern = new RegExp(`^@?${botName}[,\\s:;-]*`, 'i');
return text.replace(pattern, '').trim();
}
// Example
stripBotMention("@SportsBot: what's the weather?", "SportsBot")
// Result: "what's the weather?"import re
def strip_bot_mention(text: str, bot_name: str) -> str:
if not bot_name:
return text
# Remove @botname at the beginning + separators
pattern = rf'^@?{re.escape(bot_name)}[,\s:;-]*'
return re.sub(pattern, '', text, flags=re.IGNORECASE).strip()
# Example
strip_bot_mention("@SportsBot: what's the weather?", "SportsBot")
# Result: "what's the weather?"Complete Client Examples
Client
const io = require('socket.io-client');
// Configuration
const WS_URL = 'wss://bot.watchers.io/';
const BEARER_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1aWQiOiJhYmMxMjMiLCJwcm9qZWN0IjoicHJvZCIsImlhdCI6MTYwMDAwMDAwMH0.xyz789';
const API_KEY = 'api_9f8e7d6c5b4a3210';
const BOT_ID = 'bot_assistant_xyz';
const BOT_NAME = 'SportsBot';
// Connection
const socket = io(WS_URL, {
transports: ['websocket'],
upgrade: false,
auth: { bearer: BEARER_TOKEN }
});
// Connection handler
socket.on('connect', () => {
console.log('✅ Connected to Watchers.io');
// Subscribe to mentions in public rooms
socket.emit('external/listen/bot', { apiKey: API_KEY }, (resp) => {
console.log('Subscription to mentions:', resp);
});
// Subscribe to personal chats
socket.emit('external/listen/personal', { apiKey: API_KEY }, (resp) => {
console.log('Subscription to personal chats:', resp);
});
// Optional: subscribe to specific room
socket.emit('external/listen', {
roomId: 'sports-room-001',
apiKey: API_KEY
}, (resp) => {
console.log('Subscription to room:', resp);
});
});
// Message handler
socket.on('message', async (data) => {
console.log('📨 New message:', data);
const { roomId, userId, message, botName } = data;
// Ignore own messages
if (userId === BOT_ID) {
console.log('↪ Skip: own message');
return;
}
const text = message?.text || '';
const messageId = message?.id;
if (!text) {
console.log('↪ Skip: empty message');
return;
}
// Determine room type
const isPersonal = roomId.startsWith('personal.chatbot.');
const wasMentioned = !!botName;
// Decide if we should respond
const shouldRespond = isPersonal || wasMentioned;
if (!shouldRespond) {
console.log('↪ Skip: not mentioned in public room');
return;
}
// Clean text from mention
let cleanText = text;
if (wasMentioned) {
cleanText = text.replace(
new RegExp(`^@?${BOT_NAME}[,\\s:;-]*`, 'i'),
''
).trim();
}
// Process request
console.log(`🤖 Processing: "${cleanText}"`);
const response = await processMessage(cleanText);
// Send response
socket.emit('external/send', {
roomId,
apiKey: API_KEY,
userId: BOT_ID,
text: response,
mentionMessageId: !isPersonal ? messageId : null
}, (status) => {
console.log('✅ Message sent:', status);
});
});
// Error handlers
socket.on('connect_error', (error) => {
console.error('❌ Connection error:', error.message);
});
socket.on('disconnect', (reason) => {
console.log('🔌 Disconnected:', reason);
});
// Example processing function
async function processMessage(text) {
// Your message processing logic
return `You said: ${text}`;
}import os
import asyncio
import socketio
import re
from typing import Optional
# Configuration
WS_URL = 'wss://bot.watchers.io/'
BEARER_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1aWQiOiJhYmMxMjMiLCJwcm9qZWN0IjoicHJvZCIsImlhdCI6MTYwMDAwMDAwMH0.xyz789'
API_KEY = 'api_9f8e7d6c5b4a3210'
BOT_ID = 'bot_assistant_xyz'
BOT_NAME = 'SportsBot'
# Create Socket.IO client
sio = socketio.AsyncClient(logger=False, engineio_logger=False)
def strip_bot_mention(text: str, bot_name: Optional[str]) -> str:
"""Remove bot mention from the beginning of text."""
if not text or not bot_name:
return text
return re.sub(
rf'^@?{re.escape(bot_name)}[,\s:;-]*',
'',
text,
flags=re.IGNORECASE
).strip()
async def safe_send(room: str, text: str, mention_id: Optional[int], is_personal: bool):
"""Send message to room."""
payload = {
'roomId': room,
'apiKey': API_KEY,
'userId': BOT_ID,
'text': text,
}
if not is_personal and mention_id:
payload['mentionMessageId'] = mention_id
try:
await sio.emit(
'external/send',
payload,
callback=lambda status: print(f'✅ Message sent: {status}')
)
except Exception as e:
print(f'❌ Send error: {e}')
@sio.event
async def connect():
"""Handle connection."""
print('✅ Connected to Watchers.io')
# Subscribe to bot mentions
await sio.emit(
'external/listen/bot',
{'apiKey': API_KEY},
callback=lambda r: print(f'Subscription to mentions: {r}')
)
# Subscribe to personal chats
await sio.emit(
'external/listen/personal',
{'apiKey': API_KEY},
callback=lambda r: print(f'Subscription to personal: {r}')
)
@sio.event
async def connect_error(err):
"""Handle connection error."""
print(f'❌ Connection error: {err}')
@sio.event
async def disconnect():
"""Handle disconnection."""
print('🔌 Disconnected')
@sio.event
async def message(data):
"""Handle incoming message."""
print(f'📨 New message: {data}')
room_id = data.get('roomId', '')
user_id = data.get('userId', '')
msg = data.get('message', {})
bot_name = data.get('botName')
text = (msg.get('text') or '').strip()
message_id = msg.get('id')
# Ignore own messages
if user_id == BOT_ID or not text:
print('↪ Skip: own or empty message')
return
# Determine room type and mention
is_personal = room_id.startswith('personal.chatbot.')
was_mentioned = bool(bot_name)
# Decide if we should respond
if not (is_personal or was_mentioned):
print('↪ Skip: not mentioned in public room')
return
# Clean text from mention
clean_text = strip_bot_mention(text, bot_name if was_mentioned else None)
# Process asynchronously
async def _worker():
try:
response = await process_message(clean_text or text)
except Exception as e:
print(f'❌ Processing error: {e}')
response = 'Sorry, I encountered an error processing your request.'
print(f'🤖 Sending response: {len(response)} chars')
await safe_send(room_id, response, message_id, is_personal)
asyncio.create_task(_worker())
async def process_message(text: str) -> str:
"""Process incoming message and generate response."""
# Your message processing logic here
await asyncio.sleep(0.1) # Simulate processing
return f'You said: {text}'
async def main():
"""Main entry point."""
print('🔌 Connecting to Watchers.io...')
await sio.connect(
WS_URL,
transports=['websocket'],
auth={'bearer': BEARER_TOKEN}
)
await sio.wait()
if __name__ == '__main__':
asyncio.run(main())Response Handling (Callbacks)
All emit operations support callbacks to receive status:
JavaScript:
socket.emit('external/send', payload, (response) => {
if (response?.success) {
console.log('✅ Operation successful');
} else if (response?.error) {
console.error('❌ Error:', response.error);
} else {
console.warn('⚠️ Unknown response:', response);
}
});def callback(response):
if response and response.get('success'):
print('✅ Operation successful')
elif response and response.get('error'):
print(f'❌ Error: {response["error"]}')
else:
print(f'⚠️ Unknown response: {response}')
await sio.emit('external/send', payload, callback=callback)Possible Response Formats:
{ success: true }- successful{ success: false, error: "..." }- error with descriptionnull/undefined- no response (timeout)
Best Practices
1. Security
- Never store bearer token and API key in client-side code
- Use environment variables for storing credentials
- Regularly rotate tokens
2. Error Handling
socket.on('connect_error', (error) => {
console.error('Connection failed:', error);
// Reconnection logic
});
socket.on('error', (error) => {
console.error('Socket error:', error);
});@sio.event
async def connect_error(err):
print(f'Connection failed: {err}')
# Reconnection logic
@sio.event
async def error(err):
print(f'Socket error: {err}')3. Reconnection
Socket.IO automatically reconnects, but you need to resubscribe:
socket.on('connect', () => {
// Resubscribe after reconnection
subscribeToChannels();
});@sio.event
async def connect():
# Resubscribe after reconnection
await subscribe_to_channels()4. Preventing Duplicates
const processedMessages = new Set();
socket.on('message', (data) => {
const msgId = data.message?.id;
if (processedMessages.has(msgId)) {
return; // Already processed
}
processedMessages.add(msgId);
// Limit Set size
if (processedMessages.size > 1000) {
const first = processedMessages.values().next().value;
processedMessages.delete(first);
}
// Process...
});processed_messages = set()
@sio.event
async def message(data):
msg_id = data.get('message', {}).get('id')
if msg_id in processed_messages:
return # Already processed
processed_messages.add(msg_id)
# Limit set size
if len(processed_messages) > 1000:
processed_messages.pop()
# Process...5. Rate Limiting
class MessageQueue {
constructor() {
this.queue = [];
this.processing = false;
}
async add(payload) {
this.queue.push(payload);
if (!this.processing) {
await this.process();
}
}
async process() {
this.processing = true;
while (this.queue.length > 0) {
const payload = this.queue.shift();
socket.emit('external/send', payload);
await new Promise(resolve => setTimeout(resolve, 1000)); // 1 sec delay
}
this.processing = false;
}
}
const messageQueue = new MessageQueue();import asyncio
from collections import deque
class MessageQueue:
def __init__(self):
self.queue = deque()
self.processing = False
async def add(self, payload):
self.queue.append(payload)
if not self.processing:
await self.process()
async def process(self):
self.processing = True
while self.queue:
payload = self.queue.popleft()
await sio.emit('external/send', payload)
await asyncio.sleep(1.0) # 1 sec delay
self.processing = False
message_queue = MessageQueue()6. Logging
const DEBUG = process.env.DEBUG === 'true';
function log(level, ...args) {
if (DEBUG || level === 'error') {
console[level](...args);
}
}
socket.on('message', (data) => {
log('info', '📨 Message received:', data);
});import logging
import os
DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'
logging.basicConfig(level=logging.DEBUG if DEBUG else logging.INFO)
logger = logging.getLogger(__name__)
@sio.event
async def message(data):
logger.info(f'📨 Message received: {data}')Environment Variables
# Required
WS_URL=wss://bot.watchers.io/
WATCHERS_BOT_ID=bot_assistant_xyz
WATCHERS_API_KEY=api_9f8e7d6c5b4a3210
WATCHERS_BEARER_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
# Optional
WATCHERS_BOT_NAME=SportsBot
RESPOND_TO_ALL_ROOMS=false
LOCAL_HISTORY_FOR_PERSONAL=false
DEBUG=falseTroubleshooting
Issue: Cannot Connect
- Verify bearer token is correct
- Ensure using
transports: ['websocket'] - Check endpoint availability
Issue: Not Receiving Messages
- Ensure subscription is performed (
external/listen/*) - Verify apiKey is correct
- Check callback responses in logs after emit
Issue: Messages Not Sending
- Check all required fields: roomId, apiKey, userId, text
- Ensure userId matches your bot
- Check callback for send status
Issue: Duplicate Messages
- Implement checking by message.id
- Ensure not subscribing twice to the same room
Versioning
- Socket.IO Client: >= 4.6.2
- Transport: WebSocket only (no polling)
- Auth method: Bearer token in auth object
Updated about 12 hours ago