My contributing journey to Mindsdb

My contributing journey to Mindsdb

Table of contents

No heading

No headings in the article.

In this article, I will talk about my contributing journeys to Mindsdb through the Hashnode hackathon originated by Mindsdb.

Mindsdb is the database where you can create AI tables, train, and deploy machine learning models in SQL. Mindsdb also has a lot of integration making data collection easier.

The Hashnode hackathon consists of building open-source software with Mindsdb or contributing to a Mindsdb project by writing a new integration.

I choose to contribute by creating a new integration from the news API. The API provides an endpoint to query articles from various sources and keywords.

I have chosen to integrate this handler because during the hackathon I was working on a sentiment analysis project where the task was to detect if a sentence is more related to sport, nutrition, or anything else, so building this integration helped me collect and train my model.

First of all, I had never contributed to Mindsdb before so to create this handler I followed the documentation of how to build an API handler where you can find an integration example of Twitter.


The NewsAPi has three endpoints :

  • To query articles

  • To query top articles (headlines)

  • To query all available sources of articles

I choose to work only with the endpoint that fetches the article. I choose this option because this can also be used to simulate the results of the API that retrieves top articles (headlines).


To create an API handler first I create APITable which will store articles from the news API's endpoint.

To create an APIs table you need to create a class that inherits from APITable.

The APITable class where you can implement: select, insert, update, delete, get_columns

For the article code, I have to implement select and get_columns

class NewsAPIArticleTable(APITable):
    def __init__(self, handler):
        super().__init__(handler)

    def select(self, query: ast.Select) -> pd.DataFrame:
        conditions = extract_comparison_conditions(query.where)

        params = {}

        for op, arg1, arg2 in conditions:

            if arg1 == "query":
                params["q"] = urllib.parse.quote_plus(arg2)
            elif arg1 == "sources":
                if len(arg2.split(",")) > 20:
                    raise ValueError(
                        "The number of items it sources should be 20 or less"
                    )
                else:
                    params[arg1] = arg2
            elif arg1 == "publishedAt":
                if op == "Gt" or op == "GtE":
                    params["from"] = arg2
                if op == "Lt" or op == "LtE":
                    params["to"] = arg2
                elif op == "Eq":
                    params["from"] = arg2
                    params["to"] = arg2
            else:
                params[arg1] = arg2

        if query.limit:
            if query.limit.value > 100:
                params["page"], params["page_size"] = divmod(query.limit.value, 100)
                if params["page_size"] == 0:
                    params["page_size"] = 100
            else:
                params["page_size"] = query.limit.value
                params["page"] = 1
        else:
            params["page_size"] = 100
            params["page"] = 1

        if query.order_by:
            if len(query.order_by) == 1:
                if str(query.order_by[0]) not in ["relevancy", "publishedAt"]:
                    raise NotImplementedError("Not supported ordering by this field")
                params["sort_by"] = str(query.order_by[0])
            else:
                raise ValueError(
                    "Multiple order by condition is not supported by the API"
                )

        result = self.handler.call_application_api(params=params)

        selected_columns = []
        for target in query.targets:
            if isinstance(target, ast.Star):
                selected_columns = self.get_columns()
                break
            elif isinstance(target, ast.Identifier):
                selected_columns.append(target.parts[-1])
            else:
                raise ValueError(f"Unknown query target {type(target)}")

        return result[selected_columns]

    def get_columns(self) -> list:
        return [
            "author",
            "title",
            "description",
            "url",
            "urlToImage",
            "publishedAt",
            "content",
            "source_id",
            "source_name",
            "query",
            "searchIn",
            "domains",
            "excludedDomains",
        ]

To write an API handler you need to redefine some methods like:

  • _register_table()

  • connect()

  • check_connection()

  • native_query()

  • call_application_api()

class NewsAPIHandler(APIHandler):
    def __init__(self, name: str, **kwargs):
        super().__init__(name)
        self.api = None
        self._tables = {}

        args = kwargs.get("connection_data", {})
        self.connection_args = {}
        handler_config = Config().get("newsAPI_handler", {})

        for k in ["api_key"]:
            if k in args:
                self.connection_args[k] = args[k]
            elif f"NEWSAPI_{k.upper()}" in os.environ:
                self.connection_args[k] = os.environ[f"NEWSAPI_{k.upper()}"]
            elif k in handler_config:
                self.connection_args[k] = handler_config[k]

        self.is_connected = False
        self.api = self.create_connection()

        article = NewsAPIArticleTable(self)
        self._register_table("article", article)

    def __del__(self):
        if self.is_connected is True:
            self.disconnect()

    def disconnect(self):
        """
        Close any existing connections.
        """

        if self.is_connected is False:
            return

        self.is_connected = False
        return self.is_connected

    def create_connection(self):
        return NewsApiClient(**self.connection_args)

    def _register_table(self, table_name: str, table_class: Any):
        self._tables[table_name] = table_class

    def get_table(self, table_name: str):
        return self._tables.get(table_name)

    def connect(self) -> HandlerStatusResponse:
        if self.is_connected is True:
            return self.api

        self.api = self.create_connection()

        self.is_connected = True
        return HandlerStatusResponse(success=True)

    def check_connection(self) -> HandlerStatusResponse:
        response = HandlerStatusResponse(False)

        try:
            self.connect()

            self.api.get_top_headlines(page_size=1, page=1)
            response.success = True

        except Exception as e:
            response.error_message = e.message

        return response

    def native_query(self, query: Any):

        ast = parse_sql(query, dialect="mindsdb")
        table = self.get_table("article")
        data = table.select(ast)
        return HandlerResponse(RESPONSE_TYPE.TABLE, data_frame=data)

    def call_application_api(
        self, method_name: str = None, params: dict = None
    ) -> pd.DataFrame:
        # This will implement api base on the native query
        # By processing native query to convert it to api callable parameters
        if self.is_connected is False:
            self.connect()

        pages = params["page"]
        data = []

        for page in range(1, pages + 1):
            params["page"] = page
            result = self.api.get_everything(**params)
            articles = result["articles"]
            for article in articles:
                article["source_id"] = article["source"]["id"]
                article["source_name"] = article["source"]["name"]
                del article["source"]
                article["query"] = params.get("q")
                article["searchIn"] = params.get("searchIn")
                article["domains"] = params.get("domains")
                article["excludedDomains"] = params.get("exclude_domains")
                data.append(article)

        return pd.DataFrame(data=data)

For testing my integration I wrote some unit tests to be sure everything works as expected:

class NewsApiHandlerTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.kwargs = {
            "connection_data": {"api_key": "Your_api_key"}
        }
        cls.handler = NewsAPIHandler("test_newsapi_handler", **cls.kwargs)

    def test_0_connect(self):
        self.handler.connect()

    def test_1_check_connection(self):
        self.handler.check_connection()

    def test_2_select(self):
        # table = self.handler.get_table("article")
        res = self.handler.native_query('SELECT * FROM article WHERE query="google"')
        assert res.type is RESPONSE_TYPE.TABLE

    def test_3_select(self):
        # table = self.handler.get_table("article")
        with self.assertRaises(NewsAPIException):
            res = self.handler.native_query("SELECT * FROM article")

    def test_4_select(self):
        # table = self.handler.get_table("article")
        with self.assertRaises(NewsAPIException):
            res = self.handler.native_query(
                'SELECT * FROM article WHERE query="google" AND sources="google.com"'
            )

    def test_5_select(self):
        # table = self.handler.get_table("article")
        res = self.handler.native_query(
            'SELECT * FROM article WHERE query="google" AND sources="abc-news"'
        )
        assert res.type is RESPONSE_TYPE.TABLE

    def test_6_select(self):
        # table = self.handler.get_table("article")
        res = self.handler.native_query(
            'SELECT * FROM article WHERE query="google" AND publishedAt >= "2023-03-23" AND  publishedAt <= "2023-04-23"'
        )

    def test_7_select(self):
        # table = self.handler.get_table("article")
        res = self.handler.native_query(
            'SELECT * FROM article WHERE query="google" LIMIT 78'
        )
        assert res.type is RESPONSE_TYPE.TABLE

    def test_8_select(self):
        # table = self.handler.get_table("article")
        res = self.handler.native_query(
            'SELECT * FROM article WHERE query="google" LIMIT 150'
        )
        assert res.type is RESPONSE_TYPE.TABLE

    def test_9_select(self):
        # table = self.handler.get_table("article")
        res = self.handler.native_query(
            'SELECT * FROM article WHERE query="google" ORDER BY publishedAt'
        )
        assert res.type is RESPONSE_TYPE.TABLE

    def test_10_select(self):
        # table = self.handler.get_table("article")
        with self.assertRaises(NotImplementedError):
            res = self.handler.native_query(
                'SELECT * FROM article WHERE query="google" ORDER BY query'
            )

    def test_11_select(self):
        # table = self.handler.get_table("article")
        res = self.handler.native_query(
            'SELECT * FROM article WHERE query="google" ORDER BY relevancy'
        )
        assert res.type is RESPONSE_TYPE.TABLE