Telegram groups and spam issues
Telegram is one of the most popular messaging applications around the world. Personally, I am an active member of various groups focused on discussing programming languages and challenges. However, these groups are often infiltrated by numerous spam accounts that disseminate messages, often related to various scams.
How can a bot combat spam?
A bot, as they are referred to on the platform, or a robot if you prefer, is an entity that can be automated through the use of the official bot API. You can see it here.
Combatting spam with Telegram bots
There are various methods to combat spam, ranging from simple to elaborate. The method we will discuss today is a middle ground. The idea is to create a Telegram bot that presents a captcha to a newly joined group member. If the member sends a message of any type that is not the correct response to the captcha, it will be immediately deleted. Once they successfully respond to the captcha, they are permitted to send any message. This method is effective because many spammer accounts lack the ability to respond to a captcha.
Webhooks
Webhooks are a mechanism that enables real-time communication between different applications or systems over the internet. They allow one application to send data to another application as soon as a specific event occurs. This event-driven approach is commonly used for automating tasks, integrating services, and keeping data synchronized between different platforms. Webhooks eliminate the need for continuous polling by making data transmission instantaneous when a predefined trigger event occurs.
To reduce costs and avoid paying for a virtual machine to run a bot continuously, we will use webhooks. By using serverless cloud functions (such as AWS Lambda), we can minimize costs based on usage.
To accomplish this, polling cannot be used. Rather, we will set up a webhook in Telegram to call the public endpoint of the API Gateway. This will trigger the Lambda function handler.
The plan
Here’s what we’re going to do: we’ll have two Lambdas - one written in Go and the other in Python. The Python Lambda will handle the Telegram API, which is essentially the logic behind our Telegram bot. When a user enters, it will generate a four-letter word (the captcha value) and store it in Redis. Meanwhile, the Go service will create the captcha image for its value, and provide a public URL to access it from. This keeps the Python Lambda focused on one task. Although it would be possible to generate the image in Python and unify the code, I encountered issues with linking Pillow, an image manipulation library, with Lambda runtime libraries when using the Serverless Framework. The only viable option I found was using Lambda layers, which I decided to avoid.
Next, there is another handler that deals with any messages sent by users. These messages can be text, images, GIFs, audio, or anything else. The handler makes a call to Redis to search for the user’s captcha cipher. If it is not found, the routine ends. However, if it is found, the handler checks whether the message corresponds to the captcha. If it does not match, the message is deleted. If it matches, the Redis key is deleted, and the user is welcomed within the channel. Finally, we have a handler that is triggered when a user leaves the group. The Redis key is deleted to prevent any lingering data in Redis.
The Code
Generating captcha on-the-fly
Let’s begin with the service that generates the image. It’s important to remember that the service should provide a public URL, as the Telegram API requires a public URL. Internally, the API will download the image and distribute it.
// Directive that instructs the compiler to embed the font into the binary.
//go:embed arial.ttf
var arial []byte
// AWS Lambda Go entrypoint.
func Handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
var text = request.QueryStringParameters["text"]
var (
width, height = 200, 100
// Create the "buffer" that will be used for drawing beneath the text.
captcha = image.NewRGBA(image.Rect(0, 0, width, height))
)
// Fill the image with a white background.
draw.Draw(captcha, captcha.Bounds(), image.White, image.Point{}, draw.Src)
var (
f *opentype.Font
err error
)
// Open and parse the embedded font in the binary.
if f, err = opentype.Parse(arial); err != nil {
panic(err)
}
var (
face font.Face
fontSize = 24.0
)
// Instantiate the font with the provided parameters.
if face, err = opentype.NewFace(f, &opentype.FaceOptions{Size: fontSize, DPI: 72}); err != nil {
panic(err)
}
defer face.Close()
// Declaration of multiple variables and some calculations.
var (
drawer = &font.Drawer{
Dst: captcha,
Src: image.NewUniform(color.Black),
Face: face,
}
totalTextWidth = font.MeasureString(face, text).Ceil()
spacing = (width - totalTextWidth) / (len(text) + 1)
x = spacing
y = (height + int(fontSize)) / 2
)
// Write the text in a centered and spaced manner.
for _, char := range text {
charWidth := font.MeasureString(face, string(char)).Ceil()
drawer.Dot = fixed.Point26_6{X: fixed.I(x), Y: fixed.I(y)}
drawer.DrawString(string(char))
x += charWidth + spacing
}
// Encode the captcha's bitmap in PNG format.
var buffer bytes.Buffer
if err = png.Encode(&buffer, captcha); err != nil {
panic(err)
}
// As we are using the API v2, also called HTTP API, which is faster and cheaper than v1, we should return the response in base64 encoded format.
// Reference: https://www.serverless.com/framework/docs/providers/aws/events/http-api
return events.APIGatewayProxyResponse{StatusCode: 200, Headers: map[string]string{"Content-Type": "image/png"}, Body: base64.StdEncoding.EncodeToString(buffer.Bytes()), IsBase64Encoded: true}, nil
}
The actual bot
This will be the actual bot, responsible for receiving and processing webhooks from Telegram’s servers.
To make things easier for us, we’re using python-telegram-bot. I personally highly recommend it if you’re planning to create bots.
Telegram handlers
The python-telegram-bot library provides a clean way to define handlers, with three in particular that we will use. We will define on_enter
to handle when the user enters, on_leave
to handle when they leave, and on_message
to capture all messages not covered by the previous handlers.
application = (
Application.builder().token(os.environ["TELEGRAM_TOKEN"]).updater(None).build()
)
application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, on_enter))
application.add_handler(MessageHandler(filters.StatusUpdate.LEFT_CHAT_MEMBER, on_leave))
application.add_handler(
MessageHandler(
filters.ALL
& ~filters.StatusUpdate.NEW_CHAT_MEMBERS
& ~filters.StatusUpdate.LEFT_CHAT_MEMBER,
on_message,
)
)
Now, the implementation of each one:
async def on_enter(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handler responsible for the onboarding of new users, which can be either a single entry or an array.
"""
message = update.message
if not message:
return
for user in message.new_chat_members:
if not user:
continue
if user.is_bot:
continue
# Generate the cipher, in this case, we use only four uppercase letters.
cipher = "".join(random.sample(string.ascii_uppercase, 4))
# Generate the public URL of the captcha with the cipher.
# The environment variable 'ENDPOINT' comes from the Serverless configuration, which we will see next, but essentially, it is the endpoint of the Go Lambda.
url = "?".join([os.environ["ENDPOINT"], urlencode({"text": cipher})])
caption = "Woof! In order for your entry to be accepted into the group, please answer the captcha." # noqa
# Send the message with the photo, we use reply_photo in the user's input message, this way they receive a notification.
response = await message.reply_photo(url, caption=caption)
# Since we will be performing multiple operations in Redis, and one operation doesn't depend on another, we will use a pipeline. A pipeline works similarly to a transaction, grouping all operations into a single call.
# We save the cipher, the message, and the user's input in Redis so that we can delete the captcha as soon as it's answered and also delete the user's message in case they leave without responding, to keep the group clean.
pipe = redis.pipeline()
pipe.set(f"ciphers:{message.chat_id}:{user.id}", cipher)
pipe.set(f"messages:{message.chat_id}:{user.id}", response.id)
pipe.set(f"joins:{message.chat_id}:{user.id}", message.id)
await pipe.execute()
async def on_leave(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles when a single user leaves.
"""
message = update.message
if not message:
return
user = message.left_chat_member
if not user:
return
if user.is_bot:
return
# As explained above, we perform the Redis operations in batches.
pipe = redis.pipeline()
pipe.get(f"messages:{message.chat_id}:{user.id}")
pipe.get(f"joins:{message.chat_id}:{user.id}")
pipe.delete(f"ciphers:{message.chat_id}:{user.id}")
pipe.delete(f"messages:{message.chat_id}:{user.id}")
pipe.delete(f"joins:{message.chat_id}:{user.id}")
message_id, join_id, *_ = await pipe.execute()
# As we're using asyncio, let's take advantage of it by performing some operations in parallel.
await asyncio.gather(
context.bot.delete_message(
chat_id=message.chat_id, message_id=message_id.decode()
),
context.bot.delete_message(
chat_id=message.chat_id, message_id=join_id.decode()
),
message.delete(),
)
async def on_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Manage every message that has been sent to the group
"""
message = update.message
if not message:
return
user = message.from_user
if not user:
return
cipher = await redis.get(f"ciphers:{message.chat_id}:{user.id}")
# If there's no cipher, it means has already been answered or it's from a longstanding member.
if not cipher:
return
text = message.text
# Compare the user's text against the cipher, disregarding duplicate spaces and case.
# If not of text type, then delete it.
if not text or cipher.decode() != re.sub(r"\s+", "", text).upper():
# Delete user's message.
await message.delete()
return
message_id = await redis.get(f"messages:{message.chat_id}:{user.id}")
# The user guessed correctly, so we remove the cipher from Redis and delete the message to prevent flooding.
await asyncio.gather(
context.bot.delete_message(
chat_id=message.chat_id,
message_id=message_id.decode(),
),
redis.delete(f"ciphers:{message.chat_id}:{user.id}"),
message.delete(),
)
user = message.from_user
if not user:
return
mention = f"[{user.username}](tg://user?id={user.id})"
# Finally, welcome the new verified user.
await context.bot.send_message(
message.chat_id,
f"{mention}, welcome to the group! Au!",
parse_mode=ParseMode.MARKDOWN,
),
Handling the Webhooks
AWS Lambda does not support asynchronous functions, but we want to use async to make our bot more efficient by enabling it to perform multiple tasks in parallel.
To achieve this, we use the lambda’s entrypoint, which is the telegram function that executes an asynchronous function using asyncio’s run_until_complete
. Therefore, our actual bot’s entrypoint is the main function, which deserializes the JSON received from Telegram, processes the message, and calls the necessary bot handlers if needed.
async def main(event: APIGatewayProxyEventV1):
body = event["body"]
if not body:
return
# Parses the JSON from the HTTP request body and enqueues it in the processing queue of the library.
async with application:
await application.process_update(
Update.de_json(json.loads(body), application.bot)
)
def equals(left, right):
"""
Secure string comparison against timing attacks.
Reference: https://sqreen.github.io/DevelopersSecurityBestPractices/timing-attack/python
"""
if not left or not right:
return False
if len(left) != len(right):
return False
for c1, c2 in zip(left, right):
if c1 != c2:
return False
return True
def telegram(event: APIGatewayProxyEventV1, context: Context):
# We set up a secret token in the Telegram webhook. From that moment on, Telegram will send the token in every webhook request. We must compare it to ensure that the request is coming from Telegram.
if not equals(
event["headers"].get("x-telegram-bot-api-secret-token"),
os.environ["SECRET"],
):
return {
"statusCode": 401,
}
# Process the incoming request.
asyncio.get_event_loop().run_until_complete(main(event))
return {
"statusCode": 200,
}
The Serverless’ side
I’m a big fan of Infrastructure as Code (IaC), and the Serverless framework is a great tool for achieving it. With this framework, I can have my application’s code alongside its deployment. If necessary, I can extend it using CloudFormation, but that’s not the case here. Our use case is simple: we’ll have an internet-exposed API Gateway and two lambdas.
service: sheriff-labrador-captcha
frameworkVersion: "3"
configValidationMode: error # We want errors in any issue.
provider:
name: aws
region: us-east-1 # I know, this is the most crowded region.
architecture: arm64 # ARM64 AWS Lambdas are more cost-effective and efficient.
stage: development
functions:
telegram: # Our Python handler
runtime: python3.10
handler: handler.telegram
events:
- httpApi:
path: /webhook
method: post
environment:
ENDPOINT: !GetAtt HttpApi.ApiEndpoint # This is how we avoid using any hardcoded value; we retrieve the public endpoint from the API Gateway using the `GetAtt` function.
REDIS_DSN: ${env:REDIS_DSN} # We will utilize Redis from UpStash instead of AWS because it is more cost-effective.
TELEGRAM_TOKEN: ${env:TELEGRAM_TOKEN} # The token of our bot. It's necessary to talk to the BotFather to create a new bot.
captcha: # Our Go handler
runtime: provided.al2 # We're using Amazon Linux 2 instead of go1.x because it has been deprecated. The provided.al2 runtime is superior and recommended.
handler: bootstrap # On provided.al2, your binary should be named "bootstrap".
events:
- httpApi:
path: /
method: get
plugins:
- serverless-python-requirements # Necessary to install the Python dependencies listed in requirements.txt.
Deploy
In addition to being a big fan of Infrastructure as Code, I am also a strong advocate for Continuous Integration/Continuous Delivery (CI/CD). Therefore, we will utilize a GitHub Action to handle the deployment.
First, we need a few things: AWS Amazon credentials, namely AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
. You can generate these in IAM. I recommend creating a restricted account for the bot. Both should be stored in the Action’s “secrets.”
You will also need to create a Redis database on UpStash. Create a secret with the key REDIS_DSN
and the value provided by UpStash. If you choose to use SSL, modify the protocol in the URL from “redis” to “rediss” two _sses* at the end.
Lastly, we need the bot token. This can be generated using BotFather on Telegram itself. Create another GitHub Secret with the key TELEGRAM_TOKEN
and the value provided by BotFather.
name: Deploy
on:
push:
branches:
- main
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
# We only need Python to run the code health tools, such as Black for checking indentation, style, and other code issues; MyPy for identifying typing problems; and Ruff, which I consider to be a better and faster alternative to Flake8.
- name: Use Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: pip
- name: Install Dependencies
run: pip install -r requirements.txt
- name: Install Tools
run: pip install black mypy ruff
- name: Lint Python
run: |
black --check handler.py
ruff check handler.py
mypy handler.py
# We need Go to run the linting tools, but we'll require it in the next stage to build the binary.
- name: Use Go
uses: actions/setup-go@v4
with:
cache-dependency-path: go.sum
go-version-file: go.mod
# golangci-lint is an excellent tool for linting.
- name: Lint Go
uses: golangci/golangci-lint-action@v3
with:
args: --timeout=8m
skip-pkg-cache: true # workaround
deploy:
runs-on: ubuntu-latest
needs: lint
steps:
- name: Checkout
uses: actions/checkout@v3
# It's time to build the Go binary. We'll utilize some flags to make the binary as lean and compact as possible. Additionally, we'll disable the AWS SDK's RPC since it won't be utilized. This optimization will help reduce cold start times.
- name: Build Go Lambda Function
run: |
go get -u -t -d -v ./...
go mod tidy
go mod vendor
go build -ldflags="-s -w" -trimpath -tags lambda.norpc -o bootstrap main.go
env:
GOARCH: arm64
GOOS: linux
- name: Cache Node Modules
uses: actions/cache@v3
with:
key: npm
path: ~/.npm
# The Serverless framework is written in Node.js, so we need to install Node.js, the Serverless CLI itself, and the plugin for installing dependencies from the `requirements.txt` file.
- name: Use Node.js
uses: actions/setup-node@v3
with:
node-version: "20"
- name: Install Serverless Framework
run: npm install -g serverless serverless-python-requirements
# If all variables are configured and accurate, the deployment will proceed.
- name: Deploy
run: serverless deploy
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
REDIS_DSN: ${{ secrets.REDIS_DSN }}
TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
- name: Set Webhook
run: |
URL=$(serverless info --verbose --stage development | grep "/webhook" | grep -Eo "https://[^ ]+")
echo "Setting webhook to ${URL}"
curl "https://api.telegram.org/bot${TELEGRAM_TOKEN}/setWebhook?url=${URL}"
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
REDIS_DSN: ${{ secrets.REDIS_DSN }}
TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
Security
For Telegram to access the webhook URL, it must be publicly available. However leaving it completely unprotected would also not be adviseable. There are certain measures that can be taken:
- Use a secret string in the URL path; it can even be the Bot’s token, which is commonly done.
- Verify the source IP of the request; Telegram has fixed IPs from which it makes HTTP calls.
- Use the
secret_token
parameter in thesetWebhook
call (as we’ll see next) with a secret token, and then verify it in theX-Telegram-Bot-Api-Secret-Token
header. This is the most secure approach in my opinion.
See in action!
t.me/sheriff_labrador_captcha_bot
]
Source code
skhaz/sheriff-labrador-captcha-bot
In conclusion
The Serverless Framework is an excellent tool for working with AWS Lambda and other resources. It even offers the flexibility to utilize CloudFormation, although this particular article does not focus on that aspect. By combining the Serverless Framework with GitHub Actions, it becomes possible to create a highly scalable development environment that is also pleasant to work with. You can have multiple versions, such as staging, production, QA, and even one for each pull request.