Criando o meu primeiro pacote open source em Go! Lazuli, o pacote para interagir com Bluesky e AT Protocol

Criando o meu primeiro pacote open source em Go! Lazuli, o pacote para interagir com Bluesky e AT Protocol

Buenas para você que está lendo este artigo! Você, caso esteja acompanhando a série que estou fazendo sobre processamento de eventos, pode estar se perguntando: "Cadê o resto da série?”. Bem, a verdade é que ainda não finalizei algumas partes do desenho da solução e vou precisar de mais um tempo. Quem nunca teve que renegociar um prazo uma vez, não é mesmo?!

Porém, para não deixar o blog sem artigo, tive a ideia de pegar parte do código utilizado no artigo anterior, como o pacote de acesso ao Bluesky e AT Protocol, e de fato transformá-lo em um pacote open-source e disponibilizá-lo. Assim, ficará até mais fácil acompanhar os próximos artigos e você também poderá ter a chance de contribuir com um projeto open-source. Legal, né? Então vamos lá!

Primeiras funcionalidades

Bem, as primeiras funcionalidades abordadas aqui serão as que já criamos anteriormente. Claro, daria para adicionar mais, e até vou continuar trabalhando nesse pacote em paralelo ao nosso projeto, pois a ideia é usá-lo. Porém, pela brevidade do artigo e também pela minha experiência nula em criar um pacote público em Go, vamos nos manter em:

  1. Criar um client, que será usado para fazer toda a comunicação com o Bluesky, utilizando o AT Protocol.
    1. Adicionar o método de consumir a Firehose do Bluesky.
    2. Adicionar o método de criar uma sessão, ou seja, se autenticar no Bluesky.
    3. Adicionar o método para criar um repost.
    4. Adicionar o método para criar um like.

Minha ideia para as próximas funcionalidades é adicionar alguns métodos de get de posts, likes, reposts, entre outras informações do Bluesky que usaremos em nosso projeto. Funcionalidades de manipulação de perfil, listas, chats, entre outras, neste momento terão baixa prioridade.

A estrutura final desse primeiro passo do nosso projeto será:

go-lazuli
│   .env
│   .gitignore
│   go.mod
│   go.sum
│   LICENSE
│   README.md
│
├───example
│   ├───authentication
│   │       authentication-example.go
│   │
│   └───firehose
│           firehose-example.go
│
│
└───pkg
    └───lazuli
        │   client.go
        │   firehose.go
        │   session.go
        │
        └───dto
                auth.go
                commit_event.go
                did.go
                record.go
                repo_commit_event.go
                repo_strong_ref.go
Nota: o código está em desenvolvimento, então essa estrutura pode ser modificada. O código também sofrerá modificações ao longo do desenvolvimento, claro, sempre trazendo melhorias. Logo, não me comprometo com a compatibilidade até a primeira versão 1.x; antes disso, poderão ocorrer incompatibilidades entre as versões geradas.

Também teremos outro artigo onde mostrarei como fazer alguns ajustes no repositório do GitHub, bem como disponibilizar nosso pacote no pkg.go.dev. Acho que vai ser bacana compartilhar esse aprendizado aqui. Espero que você também curta daí!

Criando o client

Vamos lá então criar o Client do lazuli, mas antes, vou fazer uma obserção: as estruturas que iremos utilizar (que por hora chamo de dto) não colocarei aqui os códigos dela, mas já está disponível no repositório aqui.

No código a seguir, criaremos um cliente Go para interagir com o Bluesky e o AT Protocol. A ideia principal é fornecer uma interface que facilite a comunicação com a API do Bluesky, tanto via HTTP quanto via WebSocket.

Estrutura do Client

Primeiramente, definimos uma interface Client que expõe quatro métodos principais:

  1. ConsumeFirehose: responsável por consumir o Firehose do Bluesky via WebSocket, ou seja, processar grandes fluxos de dados em tempo real.
  2. CreateSession: usado para autenticar o cliente no Bluesky com um identificador (e-mail ou handle) e senha.
  3. CreateRepostRecord: cria um "repost" (compartilhamento) no Bluesky.
  4. CreateLikeRecord: cria um "like" em uma publicação do Bluesky.

A implementação real dessa interface é feita pela struct privada client, que guarda informações importantes como a URL da API e a URL do WebSocket (xrpcURL e wsURL), além da sessão autenticada.

O Construtor do Client

O método NewClient é o construtor que inicializa uma instância de client, onde definimos a URL do endpoint e o WebSocket que o cliente usará para se conectar ao Bluesky. Também é criado um WebSocket dialer padrão que será utilizado para se comunicar com o Firehose.

func NewClient(xrpcURL, wsURL string) Client {
    dialer := *websocket.DefaultDialer
    return &client{
        xrpcURL:  xrpcURL,
        wsURL:    wsURL,
        wsDialer: &dialer,
    }
}

Criando Registros (Repost e Like)

Os métodos CreateRepostRecord e CreateLikeRecord utilizam um método comum createRecord, que constrói uma requisição HTTP para a API de criação de registros do Bluesky.

Dentro de createRecord, criamos o corpo da requisição, no formato exigido pela API, utilizando a struct RequestRecordBody e a convertendo para JSON com json.Marshal.

Após isso, enviamos essa requisição via HTTP POST para a API do Bluesky. Se a requisição for bem-sucedida (status 200), o registro é criado. Simples, não é mesmo? E o melhor que boa parte está sendo utilizar a biblioteca padrão).

`func (c *client) CreateRepostRecord(ctx context.Context, p dto.CreateRecordParams) error {
    p.Resource = "app.bsky.feed.repost"
    return c.createRecord(ctx, p)
}

func (c *client) CreateLikeRecord(ctx context.Context, p dto.CreateRecordParams) error {
    p.Resource = "app.bsky.feed.like"
    return c.createRecord(ctx, p)
}

Esses métodos são responsáveis por especificar o tipo de recurso (como repost ou like) e delegar a criação do registro para o método genérico createRecord.

O Método createRecord

O método createRecord é a parte central da lógica. Ele monta o corpo da requisição com os dados fornecidos e faz a chamada à API usando HTTP. Caso ocorra um erro durante a construção ou envio da requisição, ele registra no log, mas a função em si retorna o erro para que possa ser tratado.

func (c *client) createRecord(ctx context.Context, p dto.CreateRecordParams) error {
	body := dto.RequestRecordBody{
		LexiconTypeID: p.Resource,
		Collection:    p.Resource,
		Repo:          c.session.DID,
		Record: dto.RequestRecord{
			Subject: dto.RepoStrongRef{
				URI: p.URI,
				CID: p.CID,
			},
			CreatedAt: time.Now().UTC(),
		},
	}

	jsonBody, _ := json.Marshal(body)

	url := fmt.Sprintf("%s/com.atproto.repo.createRecord", c.xrpcURL)
	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody))
	if err != nil {
		slog.Error("fail to create record request struct", "error", err, "resource", p.Resource)
		return nil
	}

	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.session.AccessJwt))
	req.Header.Set("Content-Type", "application/json")

	httpClient := &http.Client{}
	resp, err := httpClient.Do(req)
	if err != nil {
		slog.Error("fail to do request to create record", "error", err, "resource", p.Resource)
		return nil
	}
	if resp.StatusCode != http.StatusOK {
		slog.Error("create record request failed", "status", resp, "resource", p.Resource)
		return fmt.Errorf("create record request failed: %d", resp.StatusCode)
	}

	slog.Info("record created", "resource", p.Resource)

	return nil
}

Essa abordagem modular torna (ou pelo menos era a ideia de tornar) o cliente flexível para evoluir à medida que novas funcionalidades são adicionadas, mantendo a base sólida para a interação com o Bluesky. Pelo menos essa é a ideia que tenho e gostaria de ouvir (ou ler na verdade) de você o que acha dessa abordagem. Por isso que digo também que eo código vai evoluir.

O código final do client fica:

package lazuli

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"log/slog"
	"net/http"
	"time"

	"github.com/augustoasilva/go-lazuli/pkg/lazuli/dto"
	"github.com/gorilla/websocket"
)

type Client interface {
	ConsumeFirehose(ctx context.Context, handler HandlerCommitFn) error
	CreateSession(ctx context.Context, identifier, password string) (*dto.AuthResponse, error)
	CreateRepostRecord(ctx context.Context, p dto.CreateRecordParams) error
	CreateLikeRecord(ctx context.Context, p dto.CreateRecordParams) error
}

type client struct {
	xrpcURL  string
	wsURL    string
	wsDialer *websocket.Dialer
	session  *dto.AuthResponse
}

func NewClient(xrpcURL, wsURL string) Client {
	dialer := *websocket.DefaultDialer
	return &client{
		xrpcURL:  xrpcURL,
		wsURL:    wsURL,
		wsDialer: &dialer,
	}
}

func (c *client) createRecord(ctx context.Context, p dto.CreateRecordParams) error {
	body := dto.RequestRecordBody{
		LexiconTypeID: p.Resource,
		Collection:    p.Resource,
		Repo:          c.session.DID,
		Record: dto.RequestRecord{
			Subject: dto.RepoStrongRef{
				URI: p.URI,
				CID: p.CID,
			},
			CreatedAt: time.Now().UTC(),
		},
	}

	jsonBody, _ := json.Marshal(body)

	url := fmt.Sprintf("%s/com.atproto.repo.createRecord", c.xrpcURL)
	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody))
	if err != nil {
		slog.Error("fail to create record request struct", "error", err, "resource", p.Resource)
		return nil
	}

	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.session.AccessJwt))
	req.Header.Set("Content-Type", "application/json")

	httpClient := &http.Client{}
	resp, err := httpClient.Do(req)
	if err != nil {
		slog.Error("fail to do request to create record", "error", err, "resource", p.Resource)
		return nil
	}
	if resp.StatusCode != http.StatusOK {
		slog.Error("create record request failed", "status", resp, "resource", p.Resource)
		return fmt.Errorf("create record request failed: %d", resp.StatusCode)
	}

	slog.Info("record created", "resource", p.Resource)

	return nil
}

func (c *client) CreateRepostRecord(ctx context.Context, p dto.CreateRecordParams) error {
	p.Resource = "app.bsky.feed.repost"
	return c.createRecord(ctx, p)
}

func (c *client) CreateLikeRecord(ctx context.Context, p dto.CreateRecordParams) error {
	p.Resource = "app.bsky.feed.like"
	return c.createRecord(ctx, p)
}

Criando o nosso método de consumo da Firehose

Esse código que vou mostrar agora, que na verdade já mostrei no artigo anterior, implementa o método ConsumeFirehose, que conecta a um WebSocket, lê mensagens em tempo real (nesse caso, eventos de commit em um repositório), decodifica essas mensagens usando CBOR, e processa cada uma delas com uma função handler que você passa como parâmetro. Então, vamos lá para uma explicação mais detalhada dessa vez?

O que acontece no código?

Tratamento de erros na decodificação: Se houver algum erro ao decodificar a mensagem, como se a mensagem não estiver no formato esperado ou tiver algum problema nos dados, o código trata o erro. Caso seja o final do fluxo de dados (ou seja, não tem mais o que ler), o erro será io.EOF, o que faz o laço parar. Para outros tipos de erro, ele imprime no log e retorna o erro.

if decodeErr != nil {
    if decodeErr == io.EOF {
        break
    }
    slog.Error("fail to decode repo commit event message", "error", decodeErr)
    return decodeErr
}

Tratamento do evento: Se a decodificação funcionar, o código passa o evento para a função handler que foi passada quando o método foi chamado. Essa função handler é responsável por fazer algo útil com o evento de commit (como salvar no banco, processar as alterações, etc.). Se o handler retornar um erro, o código dá um panic e encerra a execução, já que esse erro não é esperado.

if handleErr := handler(evt); handleErr != nil {
    panic(handleErr)
}

Caso contrário, ele continua processando os próximos eventos. E lembrando que, essa função de callback que passamos é bem similar a antiga, mas agora tabalha com interface e você verá no futuro o porque disto!

Decodificação das mensagens com CBOR: As mensagens recebidas estão no formato CBOR, um formato binário compacto(já expliquei no artigo anterior, viu?!). Então, criamos um decoder para decodificar essas mensagens.

decoder := cbor.NewDecoder(bytes.NewReader(message))

Depois disso, o código entra em outro laço (for) para processar todas as mensagens dentro do WebSocket. Ele vai tentar decodificar cada evento da mensagem para a struct RepoCommitEvent, que é um evento de commit no repositório.

Leitura das mensagens: Depois de conectar ao WebSocket, entramos num laço infinito (for), onde o código fica esperando por novas mensagens que chegam da conexão. Cada mensagem que chega é lida com conn.ReadMessage().

_, message, errMessage := conn.ReadMessage()
if errMessage != nil {
    slog.Error("fail to read message from websocket", "error", errMessage)
    return errMessage
}

Se tiver algum erro ao ler a mensagem, ele já registra no log e retorna o erro, interrompendo a execução.

Conexão com o WebSocket: Logo no início, o método tenta se conectar ao WebSocket usando o dialer que configuramos no cliente. Se a conexão falhar, ele retorna um erro e a função para por aí.

conn, _, err := c.wsDialer.Dial(c.wsURL, nil)
if err != nil {
    slog.Error("fail to connect to websocket", "error", err)
    return err
}

Se a conexão der certo, ele imprime uma mensagem de sucesso no log e segue para a parte de leitura de mensagens.

Resumindo, esse método ConsumeFirehose é responsável por manter uma conexão contínua com o WebSocket, ler eventos em tempo real, decodificar esses eventos (que estão em CBOR) e repassar para uma função de tratamento. O código é bem simples inicial, mas ele deverá se tornar robusto no sentido de capturar e lidar com erros de conexão, leitura, e decodificação. Tudo isso será essencial quando estamos trabalhando com sistemas de alta volumetria de dados, como nesse caso, onde vários eventos de commit podem ser enviados rapidamente. Por isso ainda tem um TODO no comentário para me lembrar de melhorar a flexibilidade do consumidor e também vai me lembrar de melhorar a parte de conexão com o websocket.

O código final fica:

package lazuli

import (
	"bytes"
	"context"
	"io"
	"log/slog"

	"github.com/augustoasilva/go-lazuli/pkg/lazuli/dto"
	"github.com/fxamacker/cbor/v2"
)

type HandlerCommitFn func(evt dto.CommitEvent) error

// ConsumeFirehose connects to a websocket, reads messages, decodes them as repo commit events, and processes them using a handler function.
//
// TODO: improve firehose consumer to be more flexible
func (c *client) ConsumeFirehose(ctx context.Context, handler HandlerCommitFn) error {
	conn, _, err := c.wsDialer.Dial(c.wsURL, nil)
	if err != nil {
		slog.Error("fail to connect to websocket", "error", err)
		return err
	}
	defer conn.Close()

	slog.Info("websocket connected", "url", c.wsURL)

	for {
		_, message, errMessage := conn.ReadMessage()
		if errMessage != nil {
			slog.Error("fail to read message from websocket", "error", errMessage)
			return errMessage
		}

		decoder := cbor.NewDecoder(bytes.NewReader(message))

		for {
			var evt dto.RepoCommitEvent
			decodeErr := decoder.Decode(&evt)
			if decodeErr != nil {
				if decodeErr == io.EOF {
					break
				}
				slog.Error("fail to decode repo commit event message", "error", decodeErr)
				return decodeErr
			}

			if handleErr := handler(evt); handleErr != nil {
				panic(handleErr)
			}
		}
	}
}

Criando nosso método de autenticação

E agora por final (pelo menos deste artigo) iremos criar o código que fará a autenticação do nosso usuário no sistema. Vamos lá?!

O trecho de código a seguir vai implementar o método CreateSession, que faz a autenticação de um cliente no Bluesky, criando uma sessão válida para ele. A ideia é pegar o identificador (e-mail ou handle, sendo o handle o @ do usuário) e a senha, enviar para a API, e receber de volta os dados da sessão, que são usados nas próximas interações com o sistema.

O que o código faz

Salvando a sessão: Depois de decodificar a resposta da API, o código armazena os dados da sessão na própria struct client, para que essa sessão seja usada nas próximas requisições que precisem de autenticação. Além de salvar a sessão no client, também retornamos para que a mesma possa ser usada em algum outro ponto.

c.session = &didResponse

Decodificando a resposta: Se a API respondeu com sucesso, o próximo passo é decodificar o corpo da resposta, que vem em JSON. O código usa um json.NewDecoder para pegar a resposta da API e transformá-la em uma struct AuthResponse, que contém informações importantes da sessão, como tokens de autenticação.

var didResponse dto.AuthResponse
if jsonDecoderErr := json.NewDecoder(resp.Body).Decode(&didResponse); jsonDecoderErr != nil {
    slog.Error(
        "error to decode json response",
        "error", jsonDecoderErr,
    )
    return nil, jsonDecoderErr
}

Caso ocorra algum erro ao decodificar, ele é tratado e o erro é logado.

Tratamento da resposta da API: Depois de fazer a requisição, o código verifica se a API respondeu com sucesso, ou seja, com um código de status 200 (que significa "OK"). Se o código de status for diferente, ele loga o erro e retorna uma mensagem dizendo que a criação da sessão falhou.

if resp.StatusCode != http.StatusOK {
    slog.Error("create session request failed", "status_code", resp.StatusCode)
    return nil, fmt.Errorf("create session request failed: %d", resp.StatusCode)
}

Enviando a requisição para a API: Agora que a gente tem o JSON prontinho com os dados da sessão, o código faz uma requisição HTTP POST para o endpoint da API que cria sessões (com.atproto.server.createSession).

url := fmt.Sprintf("%s/com.atproto.server.createSession", c.xrpcURL)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))

Se acontecer algum erro ao fazer a requisição, ele já retorna esse erro e interrompe a execução. Agora, se a requisição der certo, ele continua.

Montando a requisição de autenticação: Primeiro, o código cria uma struct SessionRequest que contém o identifier e a password que você passou para o método. Depois, ele transforma essa struct em JSON com o json.Marshal.

request := dto.SessionRequest{
    Identifier: identifier,
    Password:   password,
}
requestBody, _ := json.Marshal(request

O identifier pode ser um e-mail ou um handle, e a senha é exatamente a que o usuário cadastrou no Bluesky.

Fácil né? E de novo usando a biblioteca padrão do Go até agora, onde esse método é o ponto de entrada para a autenticação no sistema do Bluesky. Sem ele, a gente não conseguiria realizar as operações que precisam de uma sessão válida, como repostar, curtir, entre outras claro.

Próximos passos

Bom, agora que já temos uma base do nosso código pronta, no próximo artigo (para este não ficar muito longo), vamos ajustar o repositório do Github para configurar as permissões, adicionar testes automatizados, geração de tags de versão, entre outras coisas. E, por fim, vamos disponibilizar nosso pacote para uso!

Depois desses dois artigos, vamos retomar nossa série sobre Processando Eventos de Rede Social usando EDA, Go, RabbitMQ e AT Protocol. Então, até o próximo artigo!