Telegram groups and spam issues

Telegram is one of the most popular messaging applications around the world. Personally, I am an active member of various groups focused on discussing programming languages and challenges. However, these groups are often infiltrated by numerous spam accounts that disseminate messages, often related to various scams.

How can a bot combat spam?

A bot, as they are referred to on the platform, or a robot if you prefer, is an entity that can be automated through the use of the official bot API. You can see it here.

Combatting spam with Telegram bots

There are various methods to combat spam, ranging from simple to elaborate. The method we will discuss today is a middle ground. The idea is to create a Telegram bot that presents a captcha to a newly joined group member. If the member sends a message of any type that is not the correct response to the captcha, it will be immediately deleted. Once they successfully respond to the captcha, they are permitted to send any message. This method is effective because many spammer accounts lack the ability to respond to a captcha.

Webhooks

Webhooks are a mechanism that enables real-time communication between different applications or systems over the internet. They allow one application to send data to another application as soon as a specific event occurs. This event-driven approach is commonly used for automating tasks, integrating services, and keeping data synchronized between different platforms. Webhooks eliminate the need for continuous polling by making data transmission instantaneous when a predefined trigger event occurs.

To reduce costs and avoid paying for a virtual machine to run a bot continuously, we will use webhooks. By using serverless cloud functions (such as AWS Lambda), we can minimize costs based on usage.

To accomplish this, polling cannot be used. Rather, we will set up a webhook in Telegram to call the public endpoint of the API Gateway. This will trigger the Lambda function handler.

The plan

Here’s what we’re going to do: we’ll have two Lambdas - one written in Go and the other in Python. The Python Lambda will handle the Telegram API, which is essentially the logic behind our Telegram bot. When a user enters, it will generate a four-letter word (the captcha value) and store it in Redis. Meanwhile, the Go service will create the captcha image for its value, and provide a public URL to access it from. This keeps the Python Lambda focused on one task. Although it would be possible to generate the image in Python and unify the code, I encountered issues with linking Pillow, an image manipulation library, with Lambda runtime libraries when using the Serverless Framework. The only viable option I found was using Lambda layers, which I decided to avoid.

Next, there is another handler that deals with any messages sent by users. These messages can be text, images, GIFs, audio, or anything else. The handler makes a call to Redis to search for the user’s captcha cipher. If it is not found, the routine ends. However, if it is found, the handler checks whether the message corresponds to the captcha. If it does not match, the message is deleted. If it matches, the Redis key is deleted, and the user is welcomed within the channel. Finally, we have a handler that is triggered when a user leaves the group. The Redis key is deleted to prevent any lingering data in Redis.

The Code

Generating captcha on-the-fly

Let’s begin with the service that generates the image. It’s important to remember that the service should provide a public URL, as the Telegram API requires a public URL. Internally, the API will download the image and distribute it.

// Directive that instructs the compiler to embed the font into the binary.
//go:embed arial.ttf
var arial []byte

// AWS Lambda Go entrypoint.
func Handler(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
	var text = request.QueryStringParameters["text"]

	var (
		width, height = 200, 100
		// Create the "buffer" that will be used for drawing beneath the text.
		captcha       = image.NewRGBA(image.Rect(0, 0, width, height))
	)

	// Fill the image with a white background.
	draw.Draw(captcha, captcha.Bounds(), image.White, image.Point{}, draw.Src)

	var (
		f   *opentype.Font
		err error
	)

	// Open and parse the embedded font in the binary.
	if f, err = opentype.Parse(arial); err != nil {
		panic(err)
	}

	var (
		face     font.Face
		fontSize = 24.0
	)

	// Instantiate the font with the provided parameters.
	if face, err = opentype.NewFace(f, &opentype.FaceOptions{Size: fontSize, DPI: 72}); err != nil {
		panic(err)
	}
	defer face.Close()

	// Declaration of multiple variables and some calculations.
	var (
		drawer = &font.Drawer{
			Dst:  captcha,
			Src:  image.NewUniform(color.Black),
			Face: face,
		}

		totalTextWidth = font.MeasureString(face, text).Ceil()
		spacing        = (width - totalTextWidth) / (len(text) + 1)
		x              = spacing
		y              = (height + int(fontSize)) / 2
	)

    // Write the text in a centered and spaced manner.
	for _, char := range text {
		charWidth := font.MeasureString(face, string(char)).Ceil()
		drawer.Dot = fixed.Point26_6{X: fixed.I(x), Y: fixed.I(y)}
		drawer.DrawString(string(char))
		x += charWidth + spacing
	}

	// Encode the captcha's bitmap in PNG format.
	var buffer bytes.Buffer
	if err = png.Encode(&buffer, captcha); err != nil {
		panic(err)
	}

	// As we are using the API v2, also called HTTP API, which is faster and cheaper than v1, we should return the response in base64 encoded format.
	// Reference: https://www.serverless.com/framework/docs/providers/aws/events/http-api
	return events.APIGatewayProxyResponse{StatusCode: 200, Headers: map[string]string{"Content-Type": "image/png"}, Body: base64.StdEncoding.EncodeToString(buffer.Bytes()), IsBase64Encoded: true}, nil
}

The actual bot

This will be the actual bot, responsible for receiving and processing webhooks from Telegram’s servers.

To make things easier for us, we’re using python-telegram-bot. I personally highly recommend it if you’re planning to create bots.

Telegram handlers

The python-telegram-bot library provides a clean way to define handlers, with three in particular that we will use. We will define on_enter to handle when the user enters, on_leave to handle when they leave, and on_message to capture all messages not covered by the previous handlers.

application = (
    Application.builder().token(os.environ["TELEGRAM_TOKEN"]).updater(None).build()
)

application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, on_enter))
application.add_handler(MessageHandler(filters.StatusUpdate.LEFT_CHAT_MEMBER, on_leave))
application.add_handler(
    MessageHandler(
        filters.ALL
        & ~filters.StatusUpdate.NEW_CHAT_MEMBERS
        & ~filters.StatusUpdate.LEFT_CHAT_MEMBER,
        on_message,
    )
)

Now, the implementation of each one:

async def on_enter(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """
    Handler responsible for the onboarding of new users, which can be either a single entry or an array.
    """
    message = update.message
    if not message:
        return

    for user in message.new_chat_members:
        if not user:
            continue

        if user.is_bot:
            continue

        # Generate the cipher, in this case, we use only four uppercase letters.
        cipher = "".join(random.sample(string.ascii_uppercase, 4))
        # Generate the public URL of the captcha with the cipher.
        # The environment variable 'ENDPOINT' comes from the Serverless configuration, which we will see next, but essentially, it is the endpoint of the Go Lambda.
        url = "?".join([os.environ["ENDPOINT"], urlencode({"text": cipher})])
        caption = "Woof! In order for your entry to be accepted into the group, please answer the captcha."  # noqa

        # Send the message with the photo, we use reply_photo in the user's input message, this way they receive a notification.
        response = await message.reply_photo(url, caption=caption)

        # Since we will be performing multiple operations in Redis, and one operation doesn't depend on another, we will use a pipeline. A pipeline works similarly to a transaction, grouping all operations into a single call.
        # We save the cipher, the message, and the user's input in Redis so that we can delete the captcha as soon as it's answered and also delete the user's message in case they leave without responding, to keep the group clean.
        pipe = redis.pipeline()
        pipe.set(f"ciphers:{message.chat_id}:{user.id}", cipher)
        pipe.set(f"messages:{message.chat_id}:{user.id}", response.id)
        pipe.set(f"joins:{message.chat_id}:{user.id}", message.id)
        await pipe.execute()


async def on_leave(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """
    Handles when a single user leaves.
    """
    message = update.message
    if not message:
        return

    user = message.left_chat_member
    if not user:
        return

    if user.is_bot:
        return

    # As explained above, we perform the Redis operations in batches.
    pipe = redis.pipeline()
    pipe.get(f"messages:{message.chat_id}:{user.id}")
    pipe.get(f"joins:{message.chat_id}:{user.id}")
    pipe.delete(f"ciphers:{message.chat_id}:{user.id}")
    pipe.delete(f"messages:{message.chat_id}:{user.id}")
    pipe.delete(f"joins:{message.chat_id}:{user.id}")

    message_id, join_id, *_ = await pipe.execute()

    # As we're using asyncio, let's take advantage of it by performing some operations in parallel.
    await asyncio.gather(
        context.bot.delete_message(
            chat_id=message.chat_id, message_id=message_id.decode()
        ),
        context.bot.delete_message(
            chat_id=message.chat_id, message_id=join_id.decode()
        ),
        message.delete(),
    )


async def on_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """
    Manage every message that has been sent to the group
    """
    message = update.message
    if not message:
        return

    user = message.from_user
    if not user:
        return

    cipher = await redis.get(f"ciphers:{message.chat_id}:{user.id}")

    # If there's no cipher, it means has already been answered or it's from a longstanding member.
    if not cipher:
        return

    text = message.text

    # Compare the user's text against the cipher, disregarding duplicate spaces and case.
    # If not of text type, then delete it.
    if not text or cipher.decode() != re.sub(r"\s+", "", text).upper():
        # Delete user's message.
        await message.delete()
        return

    message_id = await redis.get(f"messages:{message.chat_id}:{user.id}")

    # The user guessed correctly, so we remove the cipher from Redis and delete the message to prevent flooding.
    await asyncio.gather(
        context.bot.delete_message(
            chat_id=message.chat_id,
            message_id=message_id.decode(),
        ),
        redis.delete(f"ciphers:{message.chat_id}:{user.id}"),
        message.delete(),
    )

    user = message.from_user
    if not user:
        return

    mention = f"[{user.username}](tg://user?id={user.id})"

    # Finally, welcome the new verified user.
    await context.bot.send_message(
        message.chat_id,
        f"{mention}, welcome to the group! Au!",
        parse_mode=ParseMode.MARKDOWN,
    ),

Handling the Webhooks

AWS Lambda does not support asynchronous functions, but we want to use async to make our bot more efficient by enabling it to perform multiple tasks in parallel.

To achieve this, we use the lambda’s entrypoint, which is the telegram function that executes an asynchronous function using asyncio’s run_until_complete. Therefore, our actual bot’s entrypoint is the main function, which deserializes the JSON received from Telegram, processes the message, and calls the necessary bot handlers if needed.

async def main(event: APIGatewayProxyEventV1):
    body = event["body"]
    if not body:
        return

    # Parses the JSON from the HTTP request body and enqueues it in the processing queue of the library.
    async with application:
        await application.process_update(
            Update.de_json(json.loads(body), application.bot)
        )


def equals(left, right):
    """
    Secure string comparison against timing attacks.

    Reference: https://sqreen.github.io/DevelopersSecurityBestPractices/timing-attack/python
    """
    if not left or not right:
        return False

    if len(left) != len(right):
        return False

    for c1, c2 in zip(left, right):
        if c1 != c2:
            return False

    return True


def telegram(event: APIGatewayProxyEventV1, context: Context):
    # We set up a secret token in the Telegram webhook. From that moment on, Telegram will send the token in every webhook request. We must compare it to ensure that the request is coming from Telegram.
    if not equals(
        event["headers"].get("x-telegram-bot-api-secret-token"),
        os.environ["SECRET"],
    ):
        return {
            "statusCode": 401,
        }

    # Process the incoming request.
    asyncio.get_event_loop().run_until_complete(main(event))

    return {
        "statusCode": 200,
    }

The Serverless’ side

I’m a big fan of Infrastructure as Code (IaC), and the Serverless framework is a great tool for achieving it. With this framework, I can have my application’s code alongside its deployment. If necessary, I can extend it using CloudFormation, but that’s not the case here. Our use case is simple: we’ll have an internet-exposed API Gateway and two lambdas.

service: sheriff-labrador-captcha

frameworkVersion: "3"

configValidationMode: error # We want errors in any issue.

provider:
  name: aws
  region: us-east-1 # I know, this is the most crowded region.
  architecture: arm64 # ARM64 AWS Lambdas are more cost-effective and efficient.
  stage: development

functions:
  telegram: # Our Python handler
    runtime: python3.10
    handler: handler.telegram
    events:
      - httpApi:
          path: /webhook
          method: post
    environment:
      ENDPOINT: !GetAtt HttpApi.ApiEndpoint # This is how we avoid using any hardcoded value; we retrieve the public endpoint from the API Gateway using the `GetAtt` function.
      REDIS_DSN: ${env:REDIS_DSN} # We will utilize Redis from UpStash instead of AWS because it is more cost-effective.
      TELEGRAM_TOKEN: ${env:TELEGRAM_TOKEN} # The token of our bot. It's necessary to talk to the BotFather to create a new bot.

  captcha: # Our Go handler
    runtime: provided.al2 # We're using Amazon Linux 2 instead of go1.x because it has been deprecated. The provided.al2 runtime is superior and recommended.
    handler: bootstrap # On provided.al2, your binary should be named "bootstrap".
    events:
      - httpApi:
          path: /
          method: get
plugins:
  - serverless-python-requirements # Necessary to install the Python dependencies listed in requirements.txt.

Deploy

In addition to being a big fan of Infrastructure as Code, I am also a strong advocate for Continuous Integration/Continuous Delivery (CI/CD). Therefore, we will utilize a GitHub Action to handle the deployment.

First, we need a few things: AWS Amazon credentials, namely AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. You can generate these in IAM. I recommend creating a restricted account for the bot. Both should be stored in the Action’s “secrets.”

You will also need to create a Redis database on UpStash. Create a secret with the key REDIS_DSN and the value provided by UpStash. If you choose to use SSL, modify the protocol in the URL from “redis” to “rediss” two _sses* at the end.

Lastly, we need the bot token. This can be generated using BotFather on Telegram itself. Create another GitHub Secret with the key TELEGRAM_TOKEN and the value provided by BotFather.

name: Deploy

on:
  push:
    branches:
      - main

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v3

      # We only need Python to run the code health tools, such as Black for checking indentation, style, and other code issues; MyPy for identifying typing problems; and Ruff, which I consider to be a better and faster alternative to Flake8.
      - name: Use Python
        uses: actions/setup-python@v4
        with:
          python-version: "3.10"
          cache: pip

      - name: Install Dependencies
        run: pip install -r requirements.txt

      - name: Install Tools
        run: pip install black mypy ruff

      - name: Lint Python
        run: |
          black --check handler.py 
          ruff check handler.py
          mypy handler.py          

      # We need Go to run the linting tools, but we'll require it in the next stage to build the binary.
      - name: Use Go
        uses: actions/setup-go@v4
        with:
          cache-dependency-path: go.sum
          go-version-file: go.mod

      # golangci-lint is an excellent tool for linting.
      - name: Lint Go
        uses: golangci/golangci-lint-action@v3
        with:
          args: --timeout=8m
          skip-pkg-cache: true # workaround

  deploy:
    runs-on: ubuntu-latest
    needs: lint
    steps:
      - name: Checkout
        uses: actions/checkout@v3

      # It's time to build the Go binary. We'll utilize some flags to make the binary as lean and compact as possible. Additionally, we'll disable the AWS SDK's RPC since it won't be utilized. This optimization will help reduce cold start times.
      - name: Build Go Lambda Function
        run: |
          go get -u -t -d -v ./...
          go mod tidy
          go mod vendor
          go build -ldflags="-s -w" -trimpath -tags lambda.norpc -o bootstrap main.go          
        env:
          GOARCH: arm64
          GOOS: linux

      - name: Cache Node Modules
        uses: actions/cache@v3
        with:
          key: npm
          path: ~/.npm

      # The Serverless framework is written in Node.js, so we need to install Node.js, the Serverless CLI itself, and the plugin for installing dependencies from the `requirements.txt` file.
      - name: Use Node.js
        uses: actions/setup-node@v3
        with:
          node-version: "20"

      - name: Install Serverless Framework
        run: npm install -g serverless serverless-python-requirements

      # If all variables are configured and accurate, the deployment will proceed.
      - name: Deploy
        run: serverless deploy
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          REDIS_DSN: ${{ secrets.REDIS_DSN }}
          TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}

      - name: Set Webhook
        run: |
          URL=$(serverless info --verbose --stage development | grep "/webhook" | grep -Eo "https://[^  ]+")
          echo "Setting webhook to ${URL}"
          curl "https://api.telegram.org/bot${TELEGRAM_TOKEN}/setWebhook?url=${URL}"          
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          REDIS_DSN: ${{ secrets.REDIS_DSN }}
          TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}

Security

For Telegram to access the webhook URL, it must be publicly available. However leaving it completely unprotected would also not be adviseable. There are certain measures that can be taken:

  • Use a secret string in the URL path; it can even be the Bot’s token, which is commonly done.
  • Verify the source IP of the request; Telegram has fixed IPs from which it makes HTTP calls.
  • Use the secret_token parameter in the setWebhook call (as we’ll see next) with a secret token, and then verify it in the X-Telegram-Bot-Api-Secret-Token header. This is the most secure approach in my opinion.

See in action!

t.me/sheriff_labrador_captcha_bot

Sheriff Labrador Captcha Bot In Action
Sheriff Labrador Captcha Bot In Action
]

Source code

skhaz/sheriff-labrador-captcha-bot

In conclusion

The Serverless Framework is an excellent tool for working with AWS Lambda and other resources. It even offers the flexibility to utilize CloudFormation, although this particular article does not focus on that aspect. By combining the Serverless Framework with GitHub Actions, it becomes possible to create a highly scalable development environment that is also pleasant to work with. You can have multiple versions, such as staging, production, QA, and even one for each pull request.

Related articles: